Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
.. code-block:: none
auto True if url starts with auto://, protocol
will be automatically switched to https
if http not supported;
scheme (auto) connection protocol (http or https);
user (admin) NMS user;
password (nexenta) NMS password;
host (192.168.1.1) NMS host;
port (2000) NMS port.
:param url: url string
:return: tuple (auto, scheme, user, password, host, port, path)
"""
pr = urlparse.urlparse(url)
scheme = pr.scheme
auto = scheme == 'auto'
if auto:
scheme = 'http'
user = 'admin'
password = 'nexenta'
if '@' not in pr.netloc:
host_and_port = pr.netloc
else:
user_and_password, host_and_port = pr.netloc.split('@', 1)
if ':' in user_and_password:
user, password = user_and_password.split(':')
else:
user = user_and_password
if ':' in host_and_port:
host, port = host_and_port.split(':', 1)
def discard_meta_domain_whitelist(self, url, values):
p_url = urlparse.urlparse(url)
netloc = p_url.netloc.lower()
for value in values.split(','):
domain = value.lower().strip()
if not domain:
continue
prefix = "" if domain.startswith(".") else "."
if netloc.endswith("{}{}".format(prefix, domain)):
log.debug("[discard_meta_domain_whitelist] Whitelisted domain: %s (URL: %s)", domain, url)
return True
return False
def get_connection_kwargs(url):
parts = parse.urlparse(url)
if ':' in parts.netloc:
host, port = parts.netloc.split(':', 1)
port = int(port)
else:
host = parts.netloc
port = 8082
return {
'host': host,
'port': port,
'path': parts.path,
'scheme': parts.scheme,
}
ext=utils.toStr(self.makeHawkExt()),
url=requestUrl,
timestamp=expiration,
nonce='',
# content='',
# content_type='',
)
bewit = mohawk.bewit.get_bewit(resource)
return bewit.rstrip('=')
bewit = genBewit()
if not bewit:
raise exceptions.TaskclusterFailure('Did not receive a bewit')
u = urllib.parse.urlparse(requestUrl)
qs = u.query
if qs:
qs += '&'
qs += 'bewit=%s' % bewit
return urllib.parse.urlunparse((
u.scheme,
u.netloc,
u.path,
u.params,
qs,
u.fragment,
))
def paginate_alarming(resource, uri, limit):
parsed_uri = urlparse.urlparse(uri)
self_link = build_base_uri(parsed_uri)
old_query_params = _get_old_query_params(parsed_uri)
if old_query_params:
self_link += '?' + '&'.join(old_query_params)
if resource and len(resource) > limit:
old_offset = 0
for param in old_query_params:
if param.find('offset') >= 0:
old_offset = int(param.split('=')[-1])
new_offset = str(limit + old_offset)
def mask_passwd_in_url(url):
parsed = urlparse.urlparse(url)
safe_netloc = re.sub(':.*@', ':****@', parsed.netloc)
new_parsed = urlparse.ParseResult(
parsed.scheme, safe_netloc,
parsed.path, parsed.params,
parsed.query, parsed.fragment)
return urlparse.urlunparse(new_parsed)
def match_filename(url):
"""
:return: name, ext
"""
o = urlparse(url)
rv = FILENAME_REGEX.search(o.path)
if not rv:
return None
grps = rv.groups()
if len(grps) != 2:
print('Warning: match {} result not 2: {}'.format(url, grps))
return None
return grps
`::` separator.
:raises: IOError if the file can't be loaded or path can't be found
:rtype: h5py-like node
"""
url = silx.io.url.DataUrl(filename)
if url.scheme() in [None, "file", "silx"]:
# That's a local file
if not url.is_valid():
raise IOError("URL '%s' is not valid" % filename)
h5_file = _open_local_file(url.file_path())
elif url.scheme() in ["fabio"]:
raise IOError("URL '%s' containing fabio scheme is not supported" % filename)
else:
# That's maybe an URL supported by h5pyd
uri = six.moves.urllib.parse.urlparse(filename)
if h5pyd is None:
raise IOError("URL '%s' unsupported. Try to install h5pyd." % filename)
path = uri.path
endpoint = "%s://%s" % (uri.scheme, uri.netloc)
if path.startswith("/"):
path = path[1:]
return h5pyd.File(path, 'r', endpoint=endpoint)
if url.data_slice():
raise IOError("URL '%s' containing slicing is not supported" % filename)
if url.data_path() in [None, "/", ""]:
# The full file is requested
return h5_file
else:
# Only a children is requested
auth_headers = self.get_auth_headers(auth)
if auth_headers is None:
msg = 'No valid authentication is available'
raise exceptions.AuthorizationFailure(msg)
headers.update(auth_headers)
if osprofiler_web:
headers.update(osprofiler_web.get_trace_id_headers())
# if we are passed a fully qualified URL and an endpoint_filter we
# should ignore the filter. This will make it easier for clients who
# want to overrule the default endpoint_filter data added to all client
# requests. We check fully qualified here by the presence of a host.
if not urllib.parse.urlparse(url).netloc:
base_url = None
if endpoint_override:
base_url = endpoint_override % _StringFormatter(self, auth)
elif endpoint_filter:
base_url = self.get_endpoint(auth, allow=allow,
**endpoint_filter)
if not base_url:
raise exceptions.EndpointNotFound()
url = '%s/%s' % (base_url.rstrip('/'), url.lstrip('/'))
if self.cert:
kwargs.setdefault('cert', self.cert)
This method handles routing to sub-applications. It does simple routing
using regular expression matching.
This __call__ method conforms to the WSGI spec, so that instances of this
class are WSGI applications.
Args:
environ: See WSGI spec.
start_response: See WSGI spec.
Returns:
A werkzeug Response.
"""
request = wrappers.Request(environ)
parsed_url = urlparse.urlparse(request.path)
clean_path = _clean_path(parsed_url.path, self._path_prefix)
@functools.wraps(start_response)
def new_start_response(status, headers):
return start_response(status, self._headers_with_colab_csp(headers))
# pylint: disable=too-many-function-args
if clean_path in self.data_applications:
return self.data_applications[clean_path](environ, new_start_response)
else:
logger.warn('path %s not found, sending 404', clean_path)
return http_util.Respond(request, 'Not found', 'text/plain', code=404)(
environ, new_start_response)
# pylint: enable=too-many-function-args