Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_handle_request_http_1(stream: HTTPStream, http_version: str) -> None:
await stream.handle(
Request(stream_id=1, http_version=http_version, headers=[], raw_path=b"/?a=b", method="GET")
)
stream.spawn_app.assert_called()
scope = stream.spawn_app.call_args[0][0]
assert scope == {
"type": "http",
"http_version": http_version,
"asgi": {"spec_version": "2.1"},
"method": "GET",
"scheme": "http",
"path": "/",
"raw_path": b"/",
"query_string": b"a=b",
"root_path": stream.config.root_path,
"headers": [],
"client": None,
"server": None,
async def test_send_push(stream: HTTPStream) -> None:
stream.scope = {"scheme": "https", "headers": [(b"host", b"hypercorn")], "http_version": "2"}
stream.stream_id = 1
await stream.app_send({"type": "http.response.push", "path": "/push", "headers": []})
assert stream.send.call_args_list == [
call(
Request(
stream_id=1,
headers=[(b":scheme", b"https"), (b":authority", b"hypercorn")],
http_version="2",
method="GET",
raw_path=b"/push",
)
async def test_send_reject(stream: WSStream) -> None:
await stream.handle(
Request(
stream_id=1,
http_version="2",
headers=[(b"sec-websocket-version", b"13")],
raw_path=b"/",
method="GET",
)
)
await stream.app_send({"type": "websocket.http.response.start", "status": 200, "headers": []})
assert stream.state == ASGIWebsocketState.HANDSHAKE
# Must wait for response before sending anything
stream.send.assert_not_called()
await stream.app_send({"type": "websocket.http.response.body", "body": b"Body"})
assert stream.state == ASGIWebsocketState.HTTPCLOSED
stream.send.assert_called()
assert stream.send.call_args_list == [
call(Response(stream_id=1, headers=[], status_code=200)),
async def handle(self, event: Event) -> None:
if isinstance(event, Request):
path, _, query_string = event.raw_path.partition(b"?")
self.scope = {
"type": "http",
"http_version": event.http_version,
"asgi": {"spec_version": "2.1"},
"method": event.method,
"scheme": self.scheme,
"path": unquote(path.decode("ascii")),
"raw_path": path,
"query_string": query_string,
"root_path": self.config.root_path,
"headers": event.headers,
"client": self.client,
"server": self.server,
}
if event.http_version in PUSH_VERSIONS:
self.spawn_app,
STREAM_ID,
)
self.connection = H11WSConnection(self.connection)
else:
self.stream = HTTPStream(
self.config,
self.ssl,
self.client,
self.server,
self.stream_send,
self.spawn_app,
STREAM_ID,
)
await self.stream.handle(
Request(
stream_id=STREAM_ID,
headers=request.headers,
http_version=request.http_version.decode(),
method=request.method.decode("ascii").upper(),
raw_path=request.target,
)
async def handle(self, event: Event) -> None:
if isinstance(event, Request):
self.handshake = Handshake(event.headers, event.http_version)
path, _, query_string = event.raw_path.partition(b"?")
self.scope = {
"type": "websocket",
"asgi": {"spec_version": "2.1"},
"scheme": self.scheme,
"http_version": event.http_version,
"path": unquote(path.decode("ascii")),
"raw_path": path,
"query_string": query_string,
"root_path": self.config.root_path,
"headers": event.headers,
"client": self.client,
"server": self.server,
"subprotocols": self.handshake.subprotocols or [],
"extensions": {"websocket.http.response": {}},
self.connection.send_headers(
event.stream_id,
[(b":status", b"%d" % event.status_code)]
+ event.headers
+ self.config.response_headers("h3"),
)
await self.send()
elif isinstance(event, (Body, Data)):
self.connection.send_data(event.stream_id, event.data, False)
await self.send()
elif isinstance(event, (EndBody, EndData)):
self.connection.send_data(event.stream_id, b"", True)
await self.send()
elif isinstance(event, StreamClosed):
pass # ??
elif isinstance(event, Request):
await self._create_server_push(event.stream_id, event.raw_path, event.headers)
self.server,
self.stream_send,
self.spawn_app,
request.stream_id,
)
self.stream_buffers[request.stream_id] = StreamBuffer(self.event_class)
try:
self.priority.insert_stream(request.stream_id)
except priority.DuplicateStreamError:
# Recieved PRIORITY frame before HEADERS frame
pass
else:
self.priority.block(request.stream_id)
await self.streams[request.stream_id].handle(
Request(
stream_id=request.stream_id,
headers=filter_pseudo_headers(request.headers),
http_version="2",
method=method,
raw_path=raw_path,
)
self.spawn_app,
request.stream_id,
)
else:
self.streams[request.stream_id] = HTTPStream(
self.config,
True,
self.client,
self.server,
self.stream_send,
self.spawn_app,
request.stream_id,
)
await self.streams[request.stream_id].handle(
Request(
stream_id=request.stream_id,
headers=filter_pseudo_headers(request.headers),
http_version="3",
method=method,
raw_path=raw_path,
)
else:
if message["type"] == "http.response.start" and self.state == ASGIHTTPState.REQUEST:
self.response = message
elif (
message["type"] == "http.response.push"
and self.scope["http_version"] in PUSH_VERSIONS
):
if not isinstance(message["path"], str):
raise TypeError(f"{message['path']} should be a str")
headers = [(b":scheme", self.scope["scheme"].encode())]
for name, value in self.scope["headers"]:
if name == b"host":
headers.append((b":authority", value))
headers.extend(build_and_validate_headers(message["headers"]))
await self.send(
Request(
stream_id=self.stream_id,
headers=headers,
http_version=self.scope["http_version"],
method="GET",
raw_path=message["path"].encode(),
)
)
elif message["type"] == "http.response.body" and self.state in {
ASGIHTTPState.REQUEST,
ASGIHTTPState.RESPONSE,
}:
if self.state == ASGIHTTPState.REQUEST:
headers = build_and_validate_headers(self.response.get("headers", []))
await self.send(
Response(
stream_id=self.stream_id,