Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get():
query = Actor.query.paginate(
connexion.request.args.get("paginationKey", 1),
connexion.request.args.get("pageSize", 5)
)
schema = ActorSchema()
result = schema.dump(query.items, many=True)
return jsonify(result), 200
def get(self, input_dict):
if not self.table.select({f: input_dict.get(f) for f in self.decisionFields}):
return problem(404, "Not Found", "Requested Required Signoff does not exist")
try:
page = int(connexion.request.args.get("page", 1))
limit = int(connexion.request.args.get("limit", 100))
except ValueError as msg:
self.log.warning("Bad input: %s", msg)
return problem(400, "Bad Request", str(msg))
offset = limit * (page - 1)
where_count = [self.table.history.data_version != null()]
for field in self.decisionFields:
where_count.append(getattr(self.table.history, field) == input_dict.get(field))
total_count = self.table.history.count(where=where_count)
where = [getattr(self.table.history, f) == input_dict.get(f) for f in self.decisionFields]
where.append(self.table.history.data_version != null())
revisions = self.table.history.select(where=where, limit=limit, offset=offset, order_by=[self.table.history.timestamp.desc()])
return jsonify(count=total_count, required_signoffs=revisions)
def list_view():
"""Example endpoint return a list of colors by palette
"""
current_app.logger.info("Return all color list")
query = Color.query.paginate(
connexion.request.args.get("paginationKey", 1),
connexion.request.args.get("pageSize", 5)
)
schema = ColorSchema()
result = schema.dump(query.items, many=True)
return jsonify(result), 200
def get_input_dict():
reserved_filter_params = ["limit", "product", "channel", "page", "timestamp_from", "timestamp_to"]
args = request.args
query_keys = []
query = {}
for key in args:
if key not in reserved_filter_params:
query_keys.append(key)
for key in query_keys:
query[key] = request.args.get(key)
return query
def get(self, **kwargs):
opts = {}
if connexion.request.args.get("product"):
opts["product"] = connexion.request.args.get("product")
if connexion.request.args.get("name_prefix"):
opts["name_prefix"] = connexion.request.args.get("name_prefix")
if connexion.request.args.get("names_only"):
opts["nameOnly"] = True
releases = dbo.releases.getReleaseInfo(**opts)
if not opts.get("names_only"):
requirements = dbo.releases.getPotentialRequiredSignoffs(releases)
for release in releases:
release["required_signoffs"] = serialize_signoff_requirements(requirements[release["name"]])
return serialize_releases(connexion.request, releases)
def get(self):
where = {}
rule_id = connexion.request.args.get("rule_id")
if rule_id:
where["base_rule_id"] = rule_id
return super(RuleScheduledChangesView, self).get(where)
def get_rules():
# TODO: When we switch to Swagger 3, this can move to the Swagger spec
if request.args.get("timestamp") and request.args.get("product"):
return problem(status=400, title="Bad Request", detail="Cannot query with a timestamp and a product at the same time")
if request.args.get("timestamp"):
rules = dbo.rules.history.getPointInTime(request.args.get("timestamp"))
else:
where = {}
for field in ("product",):
if request.args.get(field):
where[field] = request.args[field]
rules = dbo.rules.getOrderedRules(where=where)
return jsonify(count=len(rules), rules=rules)
def get():
query = Actor.query.paginate(
connexion.request.args.get("paginationKey", 1),
connexion.request.args.get("pageSize", 5)
)
schema = ActorSchema()
result = schema.dump(query.items, many=True)
return jsonify(result), 200
@param process_revisions_callback: A callback that process revisions
according to the requested AUS object.
@type process_revisions_callback: callable
@param revisions_order_by: Fields list to sort history.
@type revisions_order_by: list
@param obj_not_found_msg: Error message for not found AUS object.
@type obj_not_found_msg: string
@param response_key: Dictionary key to wrap returned revisions.
@type response_key: string
"""
page = int(connexion.request.args.get("page", 1))
limit = int(connexion.request.args.get("limit", 10))
obj = get_object_callback()
if not obj:
return problem(status=404, title="Not Found", detail=obj_not_found_msg)
offset = limit * (page - 1)
filters = history_filters_callback(obj)
total_count = self.history_table.count(where=filters)
revisions = self.history_table.select(where=filters, limit=limit, offset=offset, order_by=revisions_order_by)
if process_revisions_callback:
revisions = process_revisions_callback(revisions)
ret = dict()
@type history_filters_callback: callable
@param process_revisions_callback: A callback that process revisions
according to the requested AUS object.
@type process_revisions_callback: callable
@param revisions_order_by: Fields list to sort history.
@type revisions_order_by: list
@param obj_not_found_msg: Error message for not found AUS object.
@type obj_not_found_msg: string
@param response_key: Dictionary key to wrap returned revisions.
@type response_key: string
"""
page = int(connexion.request.args.get("page", 1))
limit = int(connexion.request.args.get("limit", 10))
obj = get_object_callback()
if not obj:
return problem(status=404, title="Not Found", detail=obj_not_found_msg)
offset = limit * (page - 1)
filters = history_filters_callback(obj)
total_count = self.history_table.count(where=filters)
revisions = self.history_table.select(where=filters, limit=limit, offset=offset, order_by=revisions_order_by)
if process_revisions_callback:
revisions = process_revisions_callback(revisions)