Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
task = mock.Mock()
if loop is ...:
loop = mock.Mock()
loop.create_future.return_value = ()
if version < HttpVersion(1, 1):
closing = True
if headers:
headers = CIMultiDictProxy(CIMultiDict(headers))
raw_hdrs = tuple(
(k.encode('utf-8'), v.encode('utf-8')) for k, v in headers.items())
else:
headers = CIMultiDictProxy(CIMultiDict())
raw_hdrs = ()
chunked = 'chunked' in headers.get(hdrs.TRANSFER_ENCODING, '').lower()
message = RawRequestMessage(
method, path, version, headers,
raw_hdrs, closing, False, False, chunked, URL(path))
if app is None:
app = _create_app_mock()
if transport is sentinel:
transport = _create_transport(sslcontext)
if protocol is sentinel:
protocol = mock.Mock()
protocol.transport = transport
async def start(self, connection, read_until_eof=False):
nonlocal conn
conn = connection
self.status = 123
self.reason = 'Test OK'
self._headers = CIMultiDictProxy(CIMultiDict())
self.cookies = SimpleCookie()
return
async def handler(request):
assert isinstance(request.headers, CIMultiDictProxy)
return web.Response()
def test_base_ctor() -> None:
message = RawRequestMessage(
'GET', '/path/to?a=1&b=2', HttpVersion(1, 1),
CIMultiDictProxy(CIMultiDict()), (),
False, False, False, False, URL('/path/to?a=1&b=2'))
req = web.BaseRequest(message,
mock.Mock(),
mock.Mock(),
mock.Mock(),
mock.Mock(),
mock.Mock())
assert 'GET' == req.method
assert HttpVersion(1, 1) == req.version
assert req.host == socket.getfqdn()
assert '/path/to?a=1&b=2' == req.path_qs
assert '/path/to' == req.path
assert 'a=1&b=2' == req.query_string
assert CIMultiDict() == req.headers
def build_response(vcr_request, vcr_response, history):
request_info = RequestInfo(
url=URL(vcr_request.url),
method=vcr_request.method,
headers=CIMultiDictProxy(CIMultiDict(vcr_request.headers)),
real_url=URL(vcr_request.url),
)
response = MockClientResponse(vcr_request.method, URL(vcr_response.get("url")), request_info=request_info)
response.status = vcr_response["status"]["code"]
response._body = vcr_response["body"].get("string", b"")
response.reason = vcr_response["status"]["message"]
response._headers = CIMultiDictProxy(CIMultiDict(vcr_response["headers"]))
response._history = tuple(history)
response.close()
return response
"""
if self._read_bytes:
raise RuntimeError("Cannot clone request "
"after reading its content")
dct = {} # type: Dict[str, Any]
if method is not sentinel:
dct['method'] = method
if rel_url is not sentinel:
new_url = URL(rel_url)
dct['url'] = new_url
dct['path'] = str(new_url)
if headers is not sentinel:
# a copy semantic
dct['headers'] = CIMultiDictProxy(CIMultiDict(headers))
dct['raw_headers'] = tuple((k.encode('utf-8'), v.encode('utf-8'))
for k, v in headers.items())
message = self._message._replace(**dct)
kwargs = {}
if scheme is not sentinel:
kwargs['scheme'] = scheme
if host is not sentinel:
kwargs['host'] = host
if remote is not sentinel:
kwargs['remote'] = remote
return self.__class__(
message,
self._payload,
json_re = re.compile(r'^application/(?:[\w.+-]+?\+)?json')
@attr.s(frozen=True, slots=True)
class ContentDisposition:
type = attr.ib(type=str) # type: Optional[str]
parameters = attr.ib(type=MappingProxyType) # type: MappingProxyType[str, str] # noqa
filename = attr.ib(type=str) # type: Optional[str]
@attr.s(frozen=True, slots=True)
class RequestInfo:
url = attr.ib(type=URL)
method = attr.ib(type=str)
headers = attr.ib(type=CIMultiDictProxy) # type: CIMultiDictProxy[str]
real_url = attr.ib(type=URL)
@real_url.default
def real_url_default(self) -> URL:
return self.url
class Fingerprint:
HASHFUNC_BY_DIGESTLEN = {
16: md5,
20: sha1,
32: sha256,
}
def __init__(self, fingerprint: bytes) -> None:
digestlen = len(fingerprint)
def __init__(self, *, body: Optional[Union[bytes, str]] = None, status: int = 200, reason: Optional[str] = None, headers: Optional[Union[Dict, CIMultiDict, CIMultiDictProxy]] = None, content_type: Optional[str] = None, charset: Optional[str] = None) -> None:
if headers is None:
headers = CIMultiDict()
elif not isinstance(headers, (CIMultiDict, CIMultiDictProxy)):
headers = CIMultiDict(headers)
self._body = body
self._status = status
self._reason = reason
self._headers = headers
self.content_type = content_type if hdrs.CONTENT_TYPE not in headers else None
self.charset = charset if hdrs.CONTENT_TYPE not in headers else None
self.missing_content_type = hdrs.CONTENT_TYPE not in headers and not content_type and not charset
break
if self._continue is not None and not self._continue.done():
self._continue.set_result(True)
self._continue = None
# payload eof handler
payload.on_eof(self._response_eof)
# response status
self.version = message.version
self.status = message.code
self.reason = message.reason
# headers
self.headers = CIMultiDictProxy(message.headers)
self.raw_headers = tuple(message.raw_headers)
# payload
self.content = payload
# cookies
for hdr in self.headers.getall(hdrs.SET_COOKIE, ()):
try:
self.cookies.load(hdr)
except CookieError as exc:
client_logger.warning(
'Can not load response cookies: %s', exc)
return self