Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_request_fragmented_frame(self):
quic_client = FakeQuicConnection(
configuration=QuicConfiguration(is_client=True)
)
quic_server = FakeQuicConnection(
configuration=QuicConfiguration(is_client=False)
)
h3_client = H3Connection(quic_client)
h3_server = H3Connection(quic_server)
# send request
stream_id = quic_client.get_next_available_stream_id()
h3_client.send_headers(
stream_id=stream_id,
headers=[
(b":method", b"GET"),
(b":scheme", b"https"),
(b":authority", b"localhost"),
(b":path", b"/"),
(b"x-foo", b"client"),
],
def test_handle_push_frame_wrong_frame_type(self):
"""
We should not received SETTINGS on a push stream.
"""
quic_client = FakeQuicConnection(
configuration=QuicConfiguration(is_client=True)
)
h3_client = H3Connection(quic_client)
h3_client.handle_event(
StreamDataReceived(
stream_id=15,
data=encode_uint_var(StreamType.PUSH)
+ encode_uint_var(0) # push ID
+ encode_frame(FrameType.SETTINGS, b""),
end_stream=False,
)
)
self.assertEqual(
quic_client.closed,
(ErrorCode.HTTP_FRAME_UNEXPECTED, "Invalid frame type on push stream"),
)
def create_standalone_client(self, **client_options):
client = QuicConnection(
configuration=QuicConfiguration(
is_client=True, quic_logger=QuicLogger(), **client_options
)
)
client._ack_delay = 0
# kick-off handshake
client.connect(SERVER_ADDR, now=time.time())
self.assertEqual(drop(client), 1)
return client
def test_send_headers_after_trailers(self):
"""
We should not send HEADERS after trailers.
"""
quic_client = FakeQuicConnection(
configuration=QuicConfiguration(is_client=True)
)
h3_client = H3Connection(quic_client)
stream_id = quic_client.get_next_available_stream_id()
h3_client.send_headers(
stream_id=stream_id,
headers=[
(b":method", b"GET"),
(b":scheme", b"https"),
(b":authority", b"localhost"),
(b":path", b"/"),
],
)
h3_client.send_headers(
stream_id=stream_id, headers=[(b"x-some-trailer", b"foo")], end_stream=False
)
def test_handle_qpack_encoder_duplicate(self):
"""
We must only receive a single QPACK encoder stream.
"""
quic_client = FakeQuicConnection(
configuration=QuicConfiguration(is_client=True)
)
h3_client = H3Connection(quic_client)
# receive a first encoder stream
h3_client.handle_event(
StreamDataReceived(
stream_id=11,
data=encode_uint_var(StreamType.QPACK_ENCODER),
end_stream=False,
)
)
# receive a second encoder stream
h3_client.handle_event(
StreamDataReceived(
stream_id=15,
def test_handle_control_stream_duplicate(self):
"""
We must only receive a single control stream.
"""
quic_server = FakeQuicConnection(
configuration=QuicConfiguration(is_client=False)
)
h3_server = H3Connection(quic_server)
# receive a first control stream
h3_server.handle_event(
StreamDataReceived(
stream_id=2, data=encode_uint_var(StreamType.CONTROL), end_stream=False
)
)
# receive a second control stream
h3_server.handle_event(
StreamDataReceived(
stream_id=6, data=encode_uint_var(StreamType.CONTROL), end_stream=False
)
)
def test_handle_qpack_encoder_stream_error(self):
"""
Receiving garbage on the QPACK encoder stream triggers an exception.
"""
quic_client = FakeQuicConnection(
configuration=QuicConfiguration(is_client=True)
)
h3_client = H3Connection(quic_client)
h3_client.handle_event(
StreamDataReceived(
stream_id=7,
data=encode_uint_var(StreamType.QPACK_ENCODER) + b"\x00",
end_stream=False,
)
)
self.assertEqual(
quic_client.closed, (ErrorCode.HTTP_QPACK_ENCODER_STREAM_ERROR, "")
)
async def run_client_ping(host, port=4433):
configuration = QuicConfiguration(is_client=True)
configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
async with connect(host, port, configuration=configuration) as client:
await client.ping()
await client.ping()
async def run_client_ping(host, port=4433):
configuration = QuicConfiguration(is_client=True)
configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
async with connect(host, port, configuration=configuration) as client:
coros = [client.ping() for x in range(16)]
await asyncio.gather(*coros)