Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_protocol_handle_request(protocol: H11Protocol) -> None:
client = h11.Connection(h11.CLIENT)
await protocol.handle(
RawData(data=client.send(h11.Request(method="GET", target="/?a=b", headers=BASIC_HEADERS)))
)
protocol.stream.handle.assert_called()
assert protocol.stream.handle.call_args_list == [
call(
Request(
stream_id=1,
headers=[(b"host", b"hypercorn"), (b"connection", b"close")],
http_version="1.1",
method="GET",
raw_path=b"/?a=b",
)
),
call(EndBody(stream_id=1)),
]
def _make_connection_request(request_headers: Headers, method: str = "GET") -> Request:
client = h11.Connection(h11.CLIENT)
server = WSConnection(SERVER)
server.receive_data(
client.send(h11.Request(method=method, target="/", headers=request_headers))
)
event = next(server.events())
assert isinstance(event, Request)
return event
async def test_http1_request(event_loop: asyncio.AbstractEventLoop) -> None:
server = TCPServer(
sanity_framework, event_loop, Config(), MemoryReader(), MemoryWriter() # type: ignore
)
asyncio.ensure_future(server.run())
client = h11.Connection(h11.CLIENT)
await server.reader.send( # type: ignore
client.send(
h11.Request(
method="POST",
target="/",
headers=[
(b"host", b"hypercorn"),
(b"connection", b"close"),
(b"content-length", b"%d" % len(SANITY_BODY)),
],
)
)
)
await server.reader.send(client.send(h11.Data(data=SANITY_BODY))) # type: ignore
await server.reader.send(client.send(h11.EndOfMessage())) # type: ignore
events = []
while True:
event = client.next_event()
if event == h11.NEED_DATA:
def __init__(
self,
path: str,
event_loop: asyncio.AbstractEventLoop,
*,
framework: ASGIFramework = echo_framework,
) -> None:
self.transport = MockTransport()
self.client = h11.Connection(h11.CLIENT)
self.server = WebsocketServer( # type: ignore
framework, event_loop, Config(), self.transport
)
self.server.data_received(
self.client.send(
h11.Request(
method="GET",
target=path,
headers=[
("Host", "Hypercorn"),
("Upgrade", "WebSocket"),
("Connection", "Upgrade"),
("Sec-WebSocket-Version", "13"),
("Sec-WebSocket-Key", "121312"),
],
async def test_requests(method: str, headers: list, body: str) -> None:
connection = MockConnection()
await connection.send(h11.Request(method=method, target="/", headers=headers))
await connection.send(h11.Data(data=body.encode()))
await connection.send(h11.EndOfMessage())
await connection.server.handle_connection()
response, *data, end = await connection.get_events()
assert isinstance(response, h11.Response)
assert response.status_code == 200
assert (b"server", b"hypercorn-h11") in response.headers
assert b"date" in (header[0] for header in response.headers)
assert all(isinstance(datum, h11.Data) for datum in data)
data = json.loads(b"".join(datum.data for datum in data).decode())
assert data["request_body"] == body
assert isinstance(end, h11.EndOfMessage)
async def test_websocket_upgrade(event_loop: asyncio.AbstractEventLoop) -> None:
event = await _make_upgrade_request(
h11.Request(
method="GET",
target="/",
headers=[
("Host", "Hypercorn"),
("Upgrade", "WebSocket"),
("Connection", "Upgrade"),
("Sec-WebSocket-Version", "13"),
("Sec-WebSocket-Key", "121312"),
],
),
event_loop,
)
assert isinstance(event, h11.InformationalResponse)
assert event.status_code == 101
# Called on receipt of data or after recycling the connection
while True:
if self.connection.they_are_waiting_for_100_continue:
self.send(
h11.InformationalResponse(status_code=100, headers=self.response_headers())
)
try:
event = self.connection.next_event()
except h11.RemoteProtocolError:
self.send(self.error_response(400))
self.send(h11.EndOfMessage())
self.app_queue.put_nowait({"type": "http.disconnect"})
self.close()
break
else:
if isinstance(event, h11.Request):
self.stop_keep_alive_timeout()
try:
self.raise_if_upgrade(event, self.connection.trailing_data[0])
except H2CProtocolRequired as error:
self.send(
h11.InformationalResponse(
status_code=101,
headers=[(b"upgrade", b"h2c")] + self.response_headers(),
)
)
raise error
self.task = self.loop.create_task(self.handle_request(event))
self.task.add_done_callback(self.recycle_or_close)
elif isinstance(event, h11.EndOfMessage):
self.app_queue.put_nowait(
{"type": "http.request", "body": b"", "more_body": False}
async def generator():
h11_request = h11.Request(
method=request.method,
target=request.target,
headers=_stringify_headers(request.headers.items()),
)
yield state_machine.send(h11_request)
async for chunk in _make_body_iterable(request.body):
yield state_machine.send(h11.Data(data=chunk))
yield state_machine.send(h11.EndOfMessage())
async def _send_request(self, request: Request, timeout: Timeout) -> None:
"""
Send the request method, URL, and headers to the network.
"""
logger.trace(
f"send_headers method={request.method!r} "
f"target={request.url.full_path!r} "
f"headers={request.headers!r}"
)
method = request.method.encode("ascii")
target = request.url.full_path.encode("ascii")
headers = request.headers.raw
event = h11.Request(method=method, target=target, headers=headers)
await self._send_event(event, timeout)