Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def release_list(request):
kwargs = {}
if request.args.get("product"):
kwargs["product"] = request.args.get("product")
if request.args.get("name_prefix"):
kwargs["name_prefix"] = request.args.get("name_prefix")
if request.args.get("names_only"):
kwargs["nameOnly"] = True
with dbo.begin() as trans:
ret = dbo.releases.getReleaseInfo(**kwargs, transaction=trans)
for r in releases_service.get_releases(trans)["releases"]:
if kwargs.get("product") and r["product"] != kwargs["product"]:
continue
if kwargs.get("name_prefix") and not r["name"].startswith(kwargs["name_prefix"]):
continue
ret.append(r)
return ret
def _get_filters(obj, history_table):
query = get_input_dict()
where = [False, False]
where = [getattr(history_table, f) == query.get(f) for f in query]
where.append(history_table.data_version != null())
if hasattr(history_table, "product"):
where.append(history_table.product != null())
if request.args.get("product"):
where.append(history_table.product == request.args.get("product"))
if hasattr(history_table, "channel"):
where.append(history_table.channel != null())
if request.args.get("channel"):
where.append(history_table.channel == request.args.get("channel"))
if request.args.get("timestamp_from"):
where.append(history_table.timestamp >= int(request.args.get("timestamp_from")))
if request.args.get("timestamp_to"):
where.append(history_table.timestamp <= int(request.args.get("timestamp_to")))
return where
def get():
query = Film.query.options(
joinedload(Film.cast),
).paginate(
connexion.request.args.get("paginationKey", 1),
connexion.request.args.get("pageSize", 5)
)
schema = FilmSchema()
result = schema.dump(query.items, many=True)
return jsonify(result), 200
def get():
query = Film.query.options(
joinedload(Film.cast),
).paginate(
connexion.request.args.get("paginationKey", 1),
connexion.request.args.get("pageSize", 5)
)
schema = FilmSchema()
result = schema.dump(query.items, many=True)
return jsonify(result), 200
query = get_input_dict()
where = [False, False]
where = [getattr(history_table, f) == query.get(f) for f in query]
where.append(history_table.data_version != null())
if hasattr(history_table, "product"):
where.append(history_table.product != null())
if request.args.get("product"):
where.append(history_table.product == request.args.get("product"))
if hasattr(history_table, "channel"):
where.append(history_table.channel != null())
if request.args.get("channel"):
where.append(history_table.channel == request.args.get("channel"))
if request.args.get("timestamp_from"):
where.append(history_table.timestamp >= int(request.args.get("timestamp_from")))
if request.args.get("timestamp_to"):
where.append(history_table.timestamp <= int(request.args.get("timestamp_to")))
return where
def release_list(request):
kwargs = {}
if request.args.get("product"):
kwargs["product"] = request.args.get("product")
if request.args.get("name_prefix"):
kwargs["name_prefix"] = request.args.get("name_prefix")
if request.args.get("names_only"):
kwargs["nameOnly"] = True
return dbo.releases.getReleaseInfo(**kwargs)
def get_rules():
# TODO: When we switch to Swagger 3, this can move to the Swagger spec
if request.args.get("timestamp") and request.args.get("product"):
return problem(status=400, title="Bad Request", detail="Cannot query with a timestamp and a product at the same time")
if request.args.get("timestamp"):
rules = dbo.rules.history.getPointInTime(request.args.get("timestamp"))
else:
where = {}
for field in ("product",):
if request.args.get(field):
where[field] = request.args[field]
rules = dbo.rules.getOrderedRules(where=where)
return jsonify(count=len(rules), rules=rules)
def get(self):
input_dict = {
"product": connexion.request.args.get("product"),
"role": connexion.request.args.get("role"),
"channel": connexion.request.args.get("channel"),
}
return super(ProductRequiredSignoffsHistoryAPIView, self).get(input_dict)