Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Request(
stream_id=1,
http_version="2",
headers=[(b"sec-websocket-version", b"13")],
raw_path=b"/",
method="GET",
)
)
await stream.app_send({"type": "websocket.accept"})
await stream.app_send({"type": "websocket.send", "text": "hello"})
await stream.app_send({"type": "websocket.close"})
stream.send.assert_called()
assert stream.send.call_args_list == [
call(Response(stream_id=1, headers=[], status_code=200)),
call(Data(stream_id=1, data=b"\x81\x05hello")),
call(Data(stream_id=1, data=b"\x88\x02\x03\xe8")),
call(EndData(stream_id=1)),
]
async def test_protocol_send_data(protocol: H11Protocol) -> None:
await protocol.stream_send(Data(stream_id=1, data=b"hello"))
protocol.send.assert_called()
assert protocol.send.call_args_list == [call(RawData(data=b"hello"))]
async def test_handle_connection(stream: WSStream) -> None:
await stream.handle(
Request(
stream_id=1,
http_version="2",
headers=[(b"sec-websocket-version", b"13")],
raw_path=b"/?a=b",
method="GET",
)
)
await stream.app_send({"type": "websocket.accept"})
stream.app_put = CoroutineMock()
await stream.handle(Data(stream_id=1, data=b"\x81\x85&`\x13\x0eN\x05\x7fbI"))
stream.app_put.assert_called()
assert stream.app_put.call_args_list == [
call({"type": "websocket.receive", "bytes": None, "text": "hello"})
]
def next_event(self) -> Data:
if self.buffer:
event = Data(stream_id=STREAM_ID, data=bytes(self.buffer))
self.buffer = bytearray()
return event
else:
return h11.NEED_DATA
async def stream_send(self, event: StreamEvent) -> None:
if isinstance(event, Response):
self.connection.send_headers(
event.stream_id,
[(b":status", b"%d" % event.status_code)]
+ event.headers
+ self.config.response_headers("h3"),
)
await self.send()
elif isinstance(event, (Body, Data)):
self.connection.send_data(event.stream_id, event.data, False)
await self.send()
elif isinstance(event, (EndBody, EndData)):
self.connection.send_data(event.stream_id, b"", True)
await self.send()
elif isinstance(event, StreamClosed):
pass # ??
elif isinstance(event, Request):
await self._create_server_push(event.stream_id, event.raw_path, event.headers)
headers=chain(event.headers, self.config.response_headers("h11")),
status_code=event.status_code,
)
)
else:
await self._send_h11_event(
h11.InformationalResponse(
headers=chain(event.headers, self.config.response_headers("h11")),
status_code=event.status_code,
)
)
elif isinstance(event, Body):
await self._send_h11_event(h11.Data(data=event.data))
elif isinstance(event, EndBody):
await self._send_h11_event(h11.EndOfMessage())
elif isinstance(event, Data):
await self.send(RawData(data=event.data))
elif isinstance(event, EndData):
pass
elif isinstance(event, StreamClosed):
await self._maybe_recycle()
"raw_path": path,
"query_string": query_string,
"root_path": self.config.root_path,
"headers": event.headers,
"client": self.client,
"server": self.server,
"subprotocols": self.handshake.subprotocols or [],
"extensions": {"websocket.http.response": {}},
}
self.start_time = time()
if not self.handshake.is_valid():
await self._send_error_response(400)
else:
self.app_put = await self.spawn_app(self.scope, self.app_send)
await self.app_put({"type": "websocket.connect"})
elif isinstance(event, (Body, Data)):
self.connection.receive_data(event.data)
await self._handle_events()
elif isinstance(event, StreamClosed) and not self.closed:
self.closed = True
if self.app_put is not None:
if self.state in {ASGIWebsocketState.HTTPCLOSED, ASGIWebsocketState.CLOSED}:
code = CloseReason.NORMAL_CLOSURE.value
else:
code = CloseReason.ABNORMAL_CLOSURE.value
await self.app_put({"type": "websocket.disconnect", "code": code})
async def stream_send(self, event: StreamEvent) -> None:
try:
if isinstance(event, Response):
self.connection.send_headers(
event.stream_id,
[(b":status", b"%d" % event.status_code)]
+ event.headers
+ self.config.response_headers("h2"),
)
await self._flush()
elif isinstance(event, (Body, Data)):
self.priority.unblock(event.stream_id)
await self.has_data.set()
await self.stream_buffers[event.stream_id].push(event.data)
elif isinstance(event, (EndBody, EndData)):
self.stream_buffers[event.stream_id].set_complete()
self.priority.unblock(event.stream_id)
await self.has_data.set()
await self.stream_buffers[event.stream_id].drain()
elif isinstance(event, StreamClosed):
await self._close_stream(event.stream_id)
await self.send(Updated())
elif isinstance(event, Request):
await self._create_server_push(event.stream_id, event.raw_path, event.headers)
except h2.exceptions.ProtocolError:
# Connection has closed whilst blocked on flow control or
# connection has advanced ahead of the last emitted event.
async def _send_wsproto_event(self, event: WSProtoEvent) -> None:
data = self.connection.send(event)
await self.send(Data(stream_id=self.stream_id, data=data))