Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
MockHTTPStream.return_value = AsyncMock(spec=HTTPStream)
monkeypatch.setattr(hypercorn.protocol.h11, "HTTPStream", MockHTTPStream)
MockEvent = AsyncMock() # noqa: N806
MockEvent.return_value = AsyncMock(spec=IOEvent)
protocol = H11Protocol(config, False, None, None, CoroutineMock(), CoroutineMock(), MockEvent)
await protocol.handle(RawData(data=b"GET / HTTP/1.1\r\nHost: hypercorn\r\n"))
protocol.send.assert_called()
assert protocol.send.call_args_list == [
call(
RawData(
data=b"HTTP/1.1 400 \r\ncontent-length: 0\r\nconnection: close\r\n"
b"date: Thu, 01 Jan 1970 01:23:20 GMT\r\nserver: hypercorn-h11\r\n\r\n"
)
),
call(RawData(data=b"")),
call(Closed()),
]
async def handle(self, event: Event) -> None:
if isinstance(event, RawData):
self.connection.receive_data(event.data)
await self._handle_events()
elif isinstance(event, Closed):
if self.stream is not None:
await self._close_stream()
async def _consume_events(self) -> None:
while True:
event = await self.protocol_queue.get()
await self.protocol.handle(event)
if isinstance(event, Closed):
break
async def handle(self, event: Event) -> None:
if isinstance(event, RawData):
try:
events = self.connection.receive_data(event.data)
except h2.exceptions.ProtocolError:
await self._flush()
await self.send(Closed())
else:
await self._handle_events(events)
elif isinstance(event, Closed):
self.closed = True
stream_ids = list(self.streams.keys())
for stream_id in stream_ids:
await self._close_stream(stream_id)
connection = self.connections.get(header.destination_cid)
if (
connection is None
and len(event.data) >= 1200
and header.packet_type == PACKET_TYPE_INITIAL
):
connection = QuicConnection(
configuration=self.quic_config, original_connection_id=None
)
self.connections[header.destination_cid] = connection
self.connections[connection.host_cid] = connection
if connection is not None:
connection.receive_datagram(event.data, event.address, now=self.now())
await self._handle_events(connection, event.address)
elif isinstance(event, Closed):
pass
async def _read_data(self) -> None:
while True:
try:
data = await self.reader.read(MAX_RECV)
except (BrokenPipeError, ConnectionResetError):
await self.protocol.handle(Closed())
break
else:
if data == b"":
self._update_keep_alive_timeout()
break
await self.protocol.handle(RawData(data))
self._update_keep_alive_timeout()
async def protocol_send(self, event: Event) -> None:
if isinstance(event, RawData):
async with self.send_lock:
try:
with trio.CancelScope() as cancel_scope:
cancel_scope.shield = True
await self.stream.send_all(event.data)
except (trio.BrokenResourceError, trio.ClosedResourceError):
await self.protocol.handle(Closed())
elif isinstance(event, Closed):
await self._close()
await self.protocol.handle(Closed())
elif isinstance(event, Updated):
pass # Triggers the keep alive timeout update
await self._update_keep_alive_timeout()
async def protocol_send(self, event: Event) -> None:
if isinstance(event, RawData):
try:
self.writer.write(event.data)
await self.writer.drain()
except (BrokenPipeError, ConnectionResetError):
await self.protocol.handle(Closed())
elif isinstance(event, Closed):
await self._close()
await self.protocol.handle(Closed())
elif isinstance(event, Updated):
pass # Triggers the keep alive timeout update
self._update_keep_alive_timeout()