Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
if not self.current_user:
if self.request.method in ("GET", "HEAD"):
url = self.get_login_url()
if "?" not in url:
if urlparse.urlsplit(url).scheme:
# if login url is absolute, make next absolute too
next_url = self.request.full_url()
else:
next_url = self.request.uri
url += "?" + urllib.urlencode(dict(next=next_url))
self.redirect(url)
return
raise HTTPError(403)
# User is authenticated
# for p in self.current_user.permissions:
# if p.codename == codename or p.codename == 'admin':
# return method(self, *args, **kwargs)
for g in self.current_user.groups:
for p in g.permissions:
if p.codename == codename or p.codename == 'admin':
return method(self, *args, **kwargs)
#raise HTTPError(403, 'Need permission "%s"', codename)
url = self.get_no_permission_url()
url += "?codenames=%s" % codename
return self.redirect( url )
raise HTTPError(400, "No data")
# Decode data to binary form
data = [ord(c) for c in data[0].decode("hex")]
# Get modem id
modem_ids = self.get_arguments(Q_CPE_ID)
if not modem_ids or not modem_ids[0]:
metrics["webcollector_strizh_no_modem_id"] += 1
self.logger.info("No modem_id")
raise HTTPError(400, "Invalid modem id")
cpe_id = modem_ids[0]
# Get managed object by CPE ID
mo = ManagedObject.get_by_global_cpe_id(cpe_id)
if not mo:
self.logger.info("Invalid CPE: %s", cpe_id)
metrics["webcollector_strizh_invalid_cpe"] += 1
raise HTTPError(404, "Invalid CPE")
# @todo: Check profile
if mo.profile.name != Q_CPE_PROFILE:
self.logger.info("Invalid Profile: %s", mo.profile.name)
metrics["webcollector_strizh_invalid_profile"] += 1
raise HTTPError(404, "Invalid Profile")
# Process data
handler = getattr(self, "handle_0x%x" % data[0], None)
if not handler:
self.logger.info("Unklnown message type %x", data[0])
metrics["webcollector_strizh_unknown_type"] += 1
raise HTTPError(400, "Unknown message type")
handler(mo, data)
self.write("OK")
def authenticate(self, handler, data=None):
"""
Complete the OAuth dance and identify the user
"""
authorization_code = handler.get_argument("code", False)
if not authorization_code:
raise web.HTTPError(400, "OAuth callback made without a token")
yield self.carina_client.request_tokens(authorization_code)
profile = yield self.carina_client.get_user_profile()
carina_username = profile['username']
self.carina_client.user = carina_username
# verify that the user is authorized on this system
if self.whitelist and carina_username not in self.whitelist:
carina_username = None
return carina_username
request_json = json.loads(to_unicode(body))
if self.batch:
assert isinstance(request_json, list), (
"Batch requests should receive a list, but received {}."
).format(repr(request_json))
assert (
len(request_json) > 0
), "Received an empty list in the batch request."
else:
assert isinstance(
request_json, dict
), "The received data is not a valid JSON query."
self.parsed_body = request_json
return self.parsed_body
except AssertionError as e:
raise HTTPError(status_code=400, log_message=str(e))
except (TypeError, ValueError):
raise HTTPError(
status_code=400, log_message="POST body sent invalid JSON."
)
elif content_type in [
"application/x-www-form-urlencoded",
"multipart/form-data",
]:
self.parsed_body = self.request.query_arguments
return self.parsed_body
self.parsed_body = {}
return self.parsed_body
def get(self, target_id=None):
if not target_id:
raise tornado.web.HTTPError(400)
try:
filter_data = dict(self.request.arguments) # IMPORTANT!!
plugin_outputs = self.get_component("plugin_output").GetAll(filter_data, target_id=target_id)
# Group the plugin outputs to make it easier in template
grouped_plugin_outputs = {}
for poutput in plugin_outputs:
if grouped_plugin_outputs.get(poutput['plugin_code']) is None:
# No problem of overwriting
grouped_plugin_outputs[poutput['plugin_code']] = []
grouped_plugin_outputs[poutput['plugin_code']].append(poutput)
# Needed ordered list for ease in templates
grouped_plugin_outputs = collections.OrderedDict(sorted(grouped_plugin_outputs.items()))
# Get mappings
if self.get_argument("mapping", None):
mappings = self.get_component("mapping_db").GetMappings(self.get_argument("mapping", None))
def render(self):
if self.handler.breadcrumbs["name"] == 'main':
raise tornado.web.HTTPError(404)
is_owner_viewing = self.handler.is_owner_viewing(
self.handler.breadcrumbs["profile"])
if self.handler.breadcrumbs["section"] != 'main':
content_options = { 'username': self.handler.breadcrumbs["profile"],
'section': self.handler.breadcrumbs["section"],
'album': self.handler.breadcrumbs["name"],
'forum': False,
'redirect': False, }
elif self.handler.breadcrumbs["name"] != 'home':
content_options = { 'username': self.handler.breadcrumbs["profile"],
'section': self.handler.breadcrumbs["name"],
'forum': False,
'redirect': False, }
else:
params_types = getattr(operation,"_types") or [str]*len(service_params)
params_types = map(lambda x,y : y if x is None else x, params_types, [str]*len(service_params))
# produces = getattr(operation,"_produces")
services_from_request = filter(lambda x: x in path,service_name)
# query_params = getattr(operation,"_query_params")
# FIXME 为了兼容motor的异步调用逻辑,这里hack了部分pyrestful的代码
if operation._method == self.request.method and service_name == services_from_request and len(service_params) + len(service_name) == len(services_and_params):
try:
# 参数的映射关系非常粗暴,基本就是按照顺序一个一个对应起来...囧
params_values = self._find_params_value_of_url(service_name,request_path) + self._find_params_value_of_arguments(operation)
p_values = self._convert_params_values(params_values, params_types)
response = operation(*p_values)
except Exception as detail:
raise tornado.web.HTTPError(500,"Internal Server Error : %s"%detail)
def read_notebook_object(self, notebook_id):
"""Get the object representation of a notebook by notebook_id."""
if not self.notebook_exists(notebook_id):
raise web.HTTPError(404, u'Notebook does not exist: %s' % notebook_id)
try:
s = self.blob_service.get_blob(self.container, notebook_id)
except:
raise web.HTTPError(500, u'Notebook cannot be read.')
try:
# v1 and v2 and json in the .ipynb files.
nb = current.reads(s, u'json')
except:
raise web.HTTPError(500, u'Unreadable JSON notebook.')
# Todo: The last modified should actually be saved in the notebook document.
# We are just using the current datetime until that is implemented.
last_modified = datetime.datetime.utcnow()
return last_modified, nb
def prepare(self):
raise HTTPError(self._status_code)
@return RETURN: meta data parameters
"""
hostname = getNameFromSourceIP(getIP(self.request))
host = foreman.hosts[hostname]
available_meta = {
'name': host['name'],
'instance-id': host['name'],
'hostname': host['name'],
'local-hostname': host['name'],
}
if meta in available_meta.keys():
ret = available_meta[meta]
elif meta == '':
ret = "\n".join(available_meta)
else:
raise tornado.web.HTTPError(status_code=404,
log_message='No such metadata')
p.status(bool(ret), "meta data {} sent to {}"
.format(meta, hostname))
self.write(ret)