Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_websocket_consumer_connect_user_ip(
headers, client_address, expected, tracked_requests
):
async with app_with_scout() as app:
communicator = ApplicationCommunicator(
app,
asgi_websocket_scope(
path="/basic-ws/", headers=headers, client=(client_address, None)
),
)
await communicator.send_input({"type": "websocket.connect"})
response = await communicator.receive_output()
await communicator.wait(timeout=0.001)
assert response["type"] == "websocket.accept"
assert tracked_requests[0].tags["user_ip"] == expected
"path": "/",
"headers": [[b"cookie", cookie]],
}
instance = ApplicationCommunicator(app, scope)
await instance.send_input({"type": "http.request"})
output = await instance.receive_output(1)
assert 200 == output["status"]
# Try it again with a different cookie version
app = GitHubAuth(
hello_world_app,
client_id="x_client_id",
client_secret="x_client_secret",
require_auth=True,
cookie_version=2,
)
instance = ApplicationCommunicator(app, scope)
await instance.send_input({"type": "http.request"})
output = await instance.receive_output(1)
assert 302 == output["status"]
async def test_handler_basic():
"""
Tests very basic request handling, no body.
"""
scope = {"type": "http", "http_version": "1.1", "method": "GET", "path": "/test/"}
handler = ApplicationCommunicator(MockHandler, scope)
await handler.send_input({"type": "http.request"})
await handler.receive_output(1) # response start
await handler.receive_output(1) # response body
scope, body_stream = MockHandler.request_class.call_args[0]
body_stream.seek(0)
assert body_stream.read() == b""
async def test_file_response(self):
"""
Makes sure that FileResponse works over ASGI.
"""
application = get_asgi_application()
# Construct HTTP request.
communicator = ApplicationCommunicator(application, self._get_scope(path='/file/'))
await communicator.send_input({'type': 'http.request'})
# Get the file content.
with open(test_filename, 'rb') as test_file:
test_file_contents = test_file.read()
# Read the response.
response_start = await communicator.receive_output()
self.assertEqual(response_start['type'], 'http.response.start')
self.assertEqual(response_start['status'], 200)
self.assertEqual(
set(response_start['headers']),
{
(b'Content-Length', str(len(test_file_contents)).encode('ascii')),
(b'Content-Type', b'text/plain' if sys.platform == 'win32' else b'text/x-python'),
(b'Content-Disposition', b'inline; filename="urls.py"'),
},
)
async def test_get_query_string(self):
application = get_asgi_application()
for query_string in (b'name=Andrew', 'name=Andrew'):
with self.subTest(query_string=query_string):
communicator = ApplicationCommunicator(
application,
self._get_scope(path='/', query_string=query_string),
)
await communicator.send_input({'type': 'http.request'})
response_start = await communicator.receive_output()
self.assertEqual(response_start['type'], 'http.response.start')
self.assertEqual(response_start['status'], 200)
response_body = await communicator.receive_output()
self.assertEqual(response_body['type'], 'http.response.body')
self.assertEqual(response_body['body'], b'Hello Andrew!')
async def test_unknown_asgi_scope(tracked_requests):
with app_with_scout() as app:
communicator = ApplicationCommunicator(app, {"type": "lifespan"})
await communicator.send_input({"type": "lifespan.startup"})
response_start = await communicator.receive_output()
assert response_start == {"type": "lifespan.startup.complete"}
assert tracked_requests == []
async def test_expired_cookie_is_denied_access(
cookie_age, should_allow, require_auth_app
):
cookie = signed_auth_cookie_header(require_auth_app, ts=time.time() - cookie_age)
instance = ApplicationCommunicator(
require_auth_app,
{
"type": "http",
"http_version": "1.0",
"method": "GET",
"path": "/",
"headers": [[b"cookie", cookie]],
},
)
await instance.send_input({"type": "http.request"})
output = await instance.receive_output(1)
if should_allow:
assert 200 == output["status"]
else:
assert 302 == output["status"]
async def test_filtered_params(params, expected_path, tracked_requests):
with app_with_scout() as app:
communicator = ApplicationCommunicator(
app,
asgi_http_scope(path="/", query_string=urlencode(params).encode("utf-8")),
)
await communicator.send_input({"type": "http.request"})
response_start = await communicator.receive_output()
await communicator.receive_output()
assert response_start["type"] == "http.response.start"
assert response_start["status"] == 200
assert tracked_requests[0].tags["path"] == expected_path
async def test_wsgi_multi_body():
"""
Verify that multiple http.request events with body parts are all delivered
to the WSGI application.
"""
def wsgi_application(environ, start_response):
infp = environ["wsgi.input"]
body = infp.read(12)
assert body == b"Hello World!"
start_response("200 OK", [])
return []
application = WsgiToAsgi(wsgi_application)
instance = ApplicationCommunicator(
application,
{
"type": "http",
"http_version": "1.0",
"method": "POST",
"path": "/",
"query_string": b"",
"headers": [[b"content-length", b"12"]],
},
)
await instance.send_input(
{"type": "http.request", "body": b"Hello ", "more_body": True}
)
await instance.send_input({"type": "http.request", "body": b"World!"})
assert (await instance.receive_output(1)) == {
path, _, query_string = path.partition("?")
query_string = query_string.encode("utf8")
if "%" in path:
raw_path = path.encode("latin-1")
else:
raw_path = quote(path, safe="/:,").encode("latin-1")
scope = {
"type": "http",
"http_version": "1.0",
"method": method,
"path": unquote(path),
"raw_path": raw_path,
"query_string": query_string,
"headers": [[b"host", b"localhost"]],
}
instance = ApplicationCommunicator(self.asgi_app, scope)
await instance.send_input({"type": "http.request"})
# First message back should be response.start with headers and status
messages = []
start = await instance.receive_output(2)
messages.append(start)
assert start["type"] == "http.response.start"
headers = dict(
[(k.decode("utf8"), v.decode("utf8")) for k, v in start["headers"]]
)
status = start["status"]
# Now loop until we run out of response.body
body = b""
while True:
message = await instance.receive_output(2)
messages.append(message)
assert message["type"] == "http.response.body"