Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pull_short_header_no_fixed_bit(self):
buf = Buffer(data=b"\x00")
with self.assertRaises(ValueError) as cm:
pull_quic_header(buf, host_cid_length=8)
self.assertEqual(str(cm.exception), "Packet fixed bit is zero")
def test_pull_long_header_no_fixed_bit(self):
buf = Buffer(data=b"\x80\xff\x00\x00\x11\x00\x00")
with self.assertRaises(ValueError) as cm:
pull_quic_header(buf, host_cid_length=8)
self.assertEqual(str(cm.exception), "Packet fixed bit is zero")
def test_pull_retry(self):
buf = Buffer(data=load("retry.bin"))
header = pull_quic_header(buf, host_cid_length=8)
self.assertTrue(header.is_long_header)
self.assertEqual(header.version, QuicProtocolVersion.DRAFT_24)
self.assertEqual(header.packet_type, PACKET_TYPE_RETRY)
self.assertEqual(header.destination_cid, binascii.unhexlify("fee746dfde699d61"))
self.assertEqual(header.source_cid, binascii.unhexlify("59aa0942fd2f11e9"))
self.assertEqual(
header.original_destination_cid, binascii.unhexlify("d61e7448e0d63dff")
)
self.assertEqual(
header.token,
binascii.unhexlify(
"5282f57f85a1a5c50de5aac2ff7dba43ff34524737099ec41c4b8e8c76734f935e8efd51177dbbe764"
),
)
self.assertEqual(header.rest_length, 0)
self.assertEqual(buf.tell(), 73)
def test_pull_version_negotiation(self):
buf = Buffer(data=load("version_negotiation.bin"))
header = pull_quic_header(buf, host_cid_length=8)
self.assertTrue(header.is_long_header)
self.assertEqual(header.version, QuicProtocolVersion.NEGOTIATION)
self.assertEqual(header.packet_type, None)
self.assertEqual(header.destination_cid, binascii.unhexlify("9aac5a49ba87a849"))
self.assertEqual(header.source_cid, binascii.unhexlify("f92f4336fa951ba1"))
self.assertEqual(header.original_destination_cid, b"")
self.assertEqual(header.token, b"")
self.assertEqual(header.rest_length, 8)
self.assertEqual(buf.tell(), 23)
def test_pull_long_header_scid_too_long(self):
buf = Buffer(
data=binascii.unhexlify(
"c2ff0000160015000000000000000000000000000000000000000000004"
"01cfcee99ec4bbf1f7a30f9b0c9417b8c263cdd8cc972a4439d68a46320"
)
)
with self.assertRaises(ValueError) as cm:
pull_quic_header(buf, host_cid_length=8)
self.assertEqual(str(cm.exception), "Source CID is too long (21 bytes)")
def test_pull_long_header_too_short(self):
buf = Buffer(data=b"\xc0\x00")
with self.assertRaises(BufferReadError):
pull_quic_header(buf, host_cid_length=8)
def test_pull_short_header(self):
buf = Buffer(data=load("short_header.bin"))
header = pull_quic_header(buf, host_cid_length=8)
self.assertFalse(header.is_long_header)
self.assertEqual(header.version, None)
self.assertEqual(header.packet_type, 0x50)
self.assertEqual(header.destination_cid, binascii.unhexlify("f45aa7b59c0e1ad6"))
self.assertEqual(header.source_cid, b"")
self.assertEqual(header.original_destination_cid, b"")
self.assertEqual(header.token, b"")
self.assertEqual(header.rest_length, 12)
self.assertEqual(buf.tell(), 9)
def test_pull_initial_server(self):
buf = Buffer(data=load("initial_server.bin"))
header = pull_quic_header(buf, host_cid_length=8)
self.assertTrue(header.is_long_header)
self.assertEqual(header.version, QuicProtocolVersion.DRAFT_24)
self.assertEqual(header.packet_type, PACKET_TYPE_INITIAL)
self.assertEqual(header.destination_cid, b"")
self.assertEqual(header.source_cid, binascii.unhexlify("195c68344e28d479"))
self.assertEqual(header.original_destination_cid, b"")
self.assertEqual(header.token, b"")
self.assertEqual(header.rest_length, 184)
self.assertEqual(buf.tell(), 18)
def test_pull_empty(self):
buf = Buffer(data=b"")
with self.assertRaises(BufferReadError):
pull_quic_header(buf, host_cid_length=8)
async def handle(self, event: Event) -> None:
if isinstance(event, RawData):
try:
header = pull_quic_header(Buffer(data=event.data), host_cid_length=8)
except ValueError:
return
if (
header.version is not None
and header.version not in self.quic_config.supported_versions
):
data = encode_quic_version_negotiation(
source_cid=header.destination_cid,
destination_cid=header.source_cid,
supported_versions=self.quic_config.supported_versions,
)
await self.send(RawData(data=data, address=event.address))
return
connection = self.connections.get(header.destination_cid)
if (