Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
http_version="2",
headers=[(b"sec-websocket-version", b"13")],
raw_path=b"/",
method="GET",
)
)
await stream.app_send({"type": "websocket.http.response.start", "status": 200, "headers": []})
assert stream.state == ASGIWebsocketState.HANDSHAKE
# Must wait for response before sending anything
stream.send.assert_not_called()
await stream.app_send({"type": "websocket.http.response.body", "body": b"Body"})
assert stream.state == ASGIWebsocketState.HTTPCLOSED
stream.send.assert_called()
assert stream.send.call_args_list == [
call(Response(stream_id=1, headers=[], status_code=200)),
call(Body(stream_id=1, data=b"Body")),
call(EndBody(stream_id=1)),
]
stream.config._log.access.assert_called()
async def test_handle_body(stream: HTTPStream) -> None:
await stream.handle(Body(stream_id=1, data=b"data"))
stream.app_put.assert_called()
assert stream.app_put.call_args_list == [
call({"type": "http.request", "body": b"data", "more_body": True})
]
async def test_send_response(stream: HTTPStream) -> None:
await stream.handle(
Request(stream_id=1, http_version="2", headers=[], raw_path=b"/?a=b", method="GET")
)
await stream.app_send({"type": "http.response.start", "status": 200, "headers": []})
assert stream.state == ASGIHTTPState.REQUEST
# Must wait for response before sending anything
stream.send.assert_not_called()
await stream.app_send({"type": "http.response.body", "body": b"Body"})
assert stream.state == ASGIHTTPState.CLOSED
stream.send.assert_called()
assert stream.send.call_args_list == [
call(Response(stream_id=1, headers=[], status_code=200)),
call(Body(stream_id=1, data=b"Body")),
call(EndBody(stream_id=1)),
call(StreamClosed(stream_id=1)),
]
stream.config._log.access.assert_called()
async def handle(self, quic_event: QuicEvent) -> None:
for event in self.connection.handle_event(quic_event):
if isinstance(event, HeadersReceived):
await self._create_stream(event)
if event.stream_ended:
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))
elif isinstance(event, DataReceived):
await self.streams[event.stream_id].handle(
Body(stream_id=event.stream_id, data=event.data)
)
if event.stream_ended:
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))
async def _send_rejection(self, message: dict) -> None:
body_suppressed = suppress_body("GET", self.response["status"])
if self.state == ASGIWebsocketState.HANDSHAKE:
headers = build_and_validate_headers(self.response["headers"])
await self.send(
Response(
stream_id=self.stream_id,
status_code=int(self.response["status"]),
headers=headers,
)
)
self.state = ASGIWebsocketState.RESPONSE
if not body_suppressed:
await self.send(Body(stream_id=self.stream_id, data=bytes(message.get("body", b""))))
if not message.get("more_body", False):
self.state = ASGIWebsocketState.HTTPCLOSED
await self.send(EndBody(stream_id=self.stream_id))
await self.config.log.access(self.scope, self.response, time() - self.start_time)
headers = build_and_validate_headers(self.response.get("headers", []))
await self.send(
Response(
stream_id=self.stream_id,
headers=headers,
status_code=int(self.response["status"]),
)
)
self.state = ASGIHTTPState.RESPONSE
if (
not suppress_body(self.scope["method"], int(self.response["status"]))
and message.get("body", b"") != b""
):
await self.send(
Body(stream_id=self.stream_id, data=bytes(message.get("body", b"")))
)
if not message.get("more_body", False):
if self.state != ASGIHTTPState.CLOSED:
self.state = ASGIHTTPState.CLOSED
await self.config.log.access(
self.scope, self.response, time() - self.start_time
)
await self.send(EndBody(stream_id=self.stream_id))
await self.send(StreamClosed(stream_id=self.stream_id))
else:
raise UnexpectedMessage(self.state, message["type"])
if isinstance(event, Response):
if event.status_code >= 200:
await self._send_h11_event(
h11.Response(
headers=chain(event.headers, self.config.response_headers("h11")),
status_code=event.status_code,
)
)
else:
await self._send_h11_event(
h11.InformationalResponse(
headers=chain(event.headers, self.config.response_headers("h11")),
status_code=event.status_code,
)
)
elif isinstance(event, Body):
await self._send_h11_event(h11.Data(data=event.data))
elif isinstance(event, EndBody):
await self._send_h11_event(h11.EndOfMessage())
elif isinstance(event, Data):
await self.send(RawData(data=event.data))
elif isinstance(event, EndData):
pass
elif isinstance(event, StreamClosed):
await self._maybe_recycle()
"asgi": {"spec_version": "2.1"},
"method": event.method,
"scheme": self.scheme,
"path": unquote(path.decode("ascii")),
"raw_path": path,
"query_string": query_string,
"root_path": self.config.root_path,
"headers": event.headers,
"client": self.client,
"server": self.server,
}
if event.http_version in PUSH_VERSIONS:
self.scope["extensions"] = {"http.response.push": {}}
self.start_time = time()
self.app_put = await self.spawn_app(self.scope, self.app_send)
elif isinstance(event, Body):
await self.app_put(
{"type": "http.request", "body": bytes(event.data), "more_body": True}
)
elif isinstance(event, EndBody):
await self.app_put({"type": "http.request", "body": b"", "more_body": False})
elif isinstance(event, StreamClosed) and not self.closed:
self.closed = True
if self.app_put is not None:
await self.app_put({"type": "http.disconnect"})
async def _handle_events(self, events: List[h2.events.Event]) -> None:
for event in events:
if isinstance(event, h2.events.RequestReceived):
await self._create_stream(event)
elif isinstance(event, h2.events.DataReceived):
await self.streams[event.stream_id].handle(
Body(stream_id=event.stream_id, data=event.data)
)
self.connection.acknowledge_received_data(
event.flow_controlled_length, event.stream_id
)
elif isinstance(event, h2.events.StreamEnded):
await self.streams[event.stream_id].handle(EndBody(stream_id=event.stream_id))
elif isinstance(event, h2.events.StreamReset):
await self._close_stream(event.stream_id)
await self._window_updated(event.stream_id)
elif isinstance(event, h2.events.WindowUpdated):
await self._window_updated(event.stream_id)
elif isinstance(event, h2.events.PriorityUpdated):
await self._priority_updated(event)
elif isinstance(event, h2.events.RemoteSettingsChanged):
if h2.settings.SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
await self._window_updated(None)