Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
port = kwargs.pop("port", self.port)
username = kwargs.pop("username", self.username)
password = kwargs.pop("password", self.password)
authority = host
if port is not None:
authority += f":{port}"
if username:
userpass = username
if password:
userpass += f":{password}"
authority = f"{userpass}@{authority}"
kwargs["authority"] = authority
return URL(self._uri_reference.copy_with(**kwargs).unsplit())
def join(self, relative_url: URLTypes) -> "URL":
"""
Return an absolute URL, using given this URL as the base.
"""
if self.is_relative_url:
return URL(relative_url)
# We drop any fragment portion, because RFC 3986 strictly
# treats URLs with a fragment portion as not being absolute URLs.
base_uri = self._uri_reference.copy_with(fragment=None)
relative_url = URL(relative_url, allow_relative=True)
return URL(relative_url._uri_reference.resolve_with(base_uri).unsplit())
def redirect_url(self, request: AsyncRequest, response: AsyncResponse) -> URL:
"""
Return the URL for the redirect to follow.
"""
location = response.headers["Location"]
url = URL(location, allow_relative=True)
# Facilitate relative 'Location' headers, as allowed by RFC 7231.
# (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
if url.is_relative_url:
url = request.url.join(url)
# Attach previous fragment if needed (RFC 7231 7.1.2)
if request.url.fragment and not url.fragment:
url = url.copy_with(fragment=request.url.fragment)
return url
def redirect_url(self, request: Request, response: Response) -> URL:
"""
Return the URL for the redirect to follow.
"""
location = response.headers["Location"]
url = URL(location, allow_relative=True)
# Facilitate relative 'Location' headers, as allowed by RFC 7231.
# (e.g. '/path/to/resource' instead of 'http://domain.tld/path/to/resource')
if url.is_relative_url:
url = request.url.join(url)
# Attach previous fragment if needed (RFC 7231 7.1.2)
if request.url.fragment and not url.fragment:
url = url.copy_with(fragment=request.url.fragment)
return url
def __init__(self, url: URLTypes) -> None:
if not isinstance(url, URL):
url = URL(url)
self.scheme = url.scheme
self.is_ssl = url.is_ssl
self.host = url.host
self.port = url.port
def _proxy_from_url(url: URLTypes) -> Dispatcher:
nonlocal verify, cert, http2, pool_limits, backend, trust_env
url = URL(url)
if url.scheme in ("http", "https"):
return HTTPProxy(
url,
verify=verify,
cert=cert,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
http2=http2,
)
raise ValueError(f"Unknown proxy for {url!r}")
def __init__(
self,
method: str,
url: typing.Union[str, URL],
*,
params: QueryParamTypes = None,
headers: HeaderTypes = None,
cookies: CookieTypes = None,
data: RequestData = None,
files: RequestFiles = None,
json: typing.Any = None,
stream: ContentStream = None,
):
self.method = method.upper()
self.url = URL(url, params=params)
self.headers = Headers(headers)
if cookies:
self._cookies = Cookies(cookies)
self._cookies.set_cookie_header(self)
if stream is not None:
self.stream = stream
else:
self.stream = encode(data, files, json)
self.prepare()
"Use a plain string instead. proxy_mode='FORWARD_ONLY', or "
"proxy_mode='TUNNEL_ONLY'."
)
proxy_mode = proxy_mode.value
assert proxy_mode in ("DEFAULT", "FORWARD_ONLY", "TUNNEL_ONLY")
super(HTTPProxy, self).__init__(
verify=verify,
cert=cert,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
http2=http2,
)
self.proxy_url = URL(proxy_url)
self.proxy_mode = proxy_mode
self.proxy_headers = Headers(proxy_headers)
url = self.proxy_url
if url.username or url.password:
self.proxy_headers.setdefault(
"Proxy-Authorization",
self.build_auth_header(url.username, url.password),
)
# Remove userinfo from the URL authority, e.g.:
# 'username:password@proxy_host:proxy_port' -> 'proxy_host:proxy_port'
credentials, _, authority = url.authority.rpartition("@")
self.proxy_url = url.copy_with(authority=authority)
if app is not None:
dispatch = ASGIDispatch(app=app)
if dispatch is None:
dispatch = ConnectionPool(
verify=verify,
cert=cert,
http2=http2,
pool_limits=pool_limits,
backend=backend,
trust_env=trust_env,
uds=uds,
)
if base_url is None:
self.base_url = URL("", allow_relative=True)
else:
self.base_url = URL(base_url)
if params is None:
params = {}
self.auth = auth
self._params = QueryParams(params)
self._headers = Headers(headers)
self._cookies = Cookies(cookies)
self.timeout = Timeout(timeout)
self.max_redirects = max_redirects
self.trust_env = trust_env
self.dispatch = dispatch
self.netrc = NetRCInfo()