Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def patched_initialize(peer_cid: bytes):
real_initialize(peer_cid)
client.tls._supported_versions = [tls.TLS_VERSION_1_3_DRAFT_28]
"ada85271d19680c615ea7336519e3fdf6f1e26f3b1075ee1de96ffa8884e8280"
),
session_id=binascii.unhexlify(
"9aee82a2d186c1cb32a329d9dcfe004a1a438ad0485a53c6bfcf55c132a23235"
),
cipher_suite=tls.CipherSuite.AES_256_GCM_SHA384,
compression_method=tls.CompressionMethod.NULL,
key_share=(
tls.Group.SECP256R1,
binascii.unhexlify(
"048b27d0282242d84b7fcc02a9c4f13eca0329e3c7029aa34a33794e6e7ba189"
"5cca1c503bf0378ac6937c354912116ff3251026bca1958d7f387316c83ae6cf"
"b2"
),
),
supported_version=tls.TLS_VERSION_1_3,
)
buf = Buffer(1000)
push_server_hello(buf, hello)
self.assertEqual(buf.data, load("tls_server_hello.bin"))
self.assertEqual(hello.early_data, False)
self.assertEqual(
hello.key_share,
[
(
tls.Group.SECP256R1,
binascii.unhexlify(
"048842315c437bb0ce2929c816fee4e942ec5cb6db6a6b9bf622680188ebb0d4"
"b652e69033f71686aa01cbc79155866e264c9f33f45aa16b0dfa10a222e3a669"
"22"
),
)
],
)
self.assertEqual(
hello.psk_key_exchange_modes, [tls.PskKeyExchangeMode.PSK_DHE_KE]
)
self.assertEqual(hello.server_name, "cloudflare-quic.com")
self.assertEqual(
hello.signature_algorithms,
[
tls.SignatureAlgorithm.ECDSA_SECP256R1_SHA256,
tls.SignatureAlgorithm.ECDSA_SECP384R1_SHA384,
tls.SignatureAlgorithm.ECDSA_SECP521R1_SHA512,
tls.SignatureAlgorithm.ED25519,
tls.SignatureAlgorithm.ED448,
tls.SignatureAlgorithm.RSA_PSS_PSS_SHA256,
tls.SignatureAlgorithm.RSA_PSS_PSS_SHA384,
tls.SignatureAlgorithm.RSA_PSS_PSS_SHA512,
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA256,
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA384,
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA512,
def test_verify_certificate_chain_self_signed(self):
certificate, _ = generate_ec_certificate(
common_name="localhost", curve=ec.SECP256R1
)
with patch("aioquic.tls.utcnow") as mock_utcnow:
mock_utcnow.return_value = certificate.not_valid_before
# fail
with self.assertRaises(tls.AlertBadCertificate) as cm:
verify_certificate(certificate=certificate, server_name="localhost")
self.assertEqual(str(cm.exception), "self signed certificate")
# ok
verify_certificate(
cadata=certificate.public_bytes(serialization.Encoding.PEM),
certificate=certificate,
server_name="localhost",
)
server_input = merge_buffers(client_buf)
reset_buffers(client_buf)
# handle client hello
# send server hello, encrypted extensions, certificate, certificate verify, finished
server_buf = create_buffers()
server.handle_message(server_input, server_buf)
self.assertEqual(server.state, State.SERVER_EXPECT_FINISHED)
client_input = merge_buffers(server_buf)
reset_buffers(server_buf)
# mess with finished verify data
client_input = client_input[:-4] + bytes(4)
# handle server hello, encrypted extensions, certificate, certificate verify, finished
with self.assertRaises(tls.AlertDecryptError):
client.handle_message(client_input, client_buf)
"048842315c437bb0ce2929c816fee4e942ec5cb6db6a6b9bf622680188ebb0d4"
"b652e69033f71686aa01cbc79155866e264c9f33f45aa16b0dfa10a222e3a669"
"22"
),
)
],
)
self.assertEqual(
hello.psk_key_exchange_modes, [tls.PskKeyExchangeMode.PSK_DHE_KE]
)
self.assertEqual(hello.server_name, "cloudflare-quic.com")
self.assertEqual(
hello.signature_algorithms,
[
tls.SignatureAlgorithm.ECDSA_SECP256R1_SHA256,
tls.SignatureAlgorithm.ECDSA_SECP384R1_SHA384,
tls.SignatureAlgorithm.ECDSA_SECP521R1_SHA512,
tls.SignatureAlgorithm.ED25519,
tls.SignatureAlgorithm.ED448,
tls.SignatureAlgorithm.RSA_PSS_PSS_SHA256,
tls.SignatureAlgorithm.RSA_PSS_PSS_SHA384,
tls.SignatureAlgorithm.RSA_PSS_PSS_SHA512,
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA256,
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA384,
tls.SignatureAlgorithm.RSA_PSS_RSAE_SHA512,
tls.SignatureAlgorithm.RSA_PKCS1_SHA256,
tls.SignatureAlgorithm.RSA_PKCS1_SHA384,
tls.SignatureAlgorithm.RSA_PKCS1_SHA512,
],
)
self.assertEqual(
hello.supported_groups,
# handle client hello
# send server hello, encrypted extensions, finished
server_buf = create_buffers()
server.handle_message(server_input, server_buf)
self.assertEqual(server.state, State.SERVER_EXPECT_FINISHED)
# tamper with pre_share_key index
buf = server_buf[tls.Epoch.INITIAL]
buf.seek(buf.tell() - 1)
buf.push_uint8(1)
client_input = merge_buffers(server_buf)
self.assertEqual(len(client_input), 307)
reset_buffers(server_buf)
# handle server hello and bomb
with self.assertRaises(tls.AlertIllegalParameter):
client.handle_message(client_input, client_buf)
def test_on_packet_lost_crypto(self):
packet = QuicSentPacket(
epoch=tls.Epoch.INITIAL,
in_flight=True,
is_ack_eliciting=True,
is_crypto_packet=True,
packet_number=0,
packet_type=PACKET_TYPE_INITIAL,
sent_bytes=1280,
sent_time=123.45,
)
space = self.INITIAL_SPACE
self.recovery.on_packet_sent(packet, space)
self.assertEqual(self.recovery.bytes_in_flight, 1280)
self.assertEqual(space.ack_eliciting_in_flight, 1)
self.assertEqual(len(space.sent_packets), 1)
self.recovery.on_packet_lost(packet, space)
def test_on_ack_received_ack_eliciting(self):
packet = QuicSentPacket(
epoch=tls.Epoch.ONE_RTT,
in_flight=True,
is_ack_eliciting=True,
is_crypto_packet=False,
packet_number=0,
packet_type=PACKET_TYPE_ONE_RTT,
sent_bytes=1280,
sent_time=0.0,
)
space = self.ONE_RTT_SPACE
# Â packet sent
self.recovery.on_packet_sent(packet, space)
self.assertEqual(self.recovery.bytes_in_flight, 1280)
self.assertEqual(space.ack_eliciting_in_flight, 1)
self.assertEqual(len(space.sent_packets), 1)
def create_buffers():
return {
tls.Epoch.INITIAL: Buffer(capacity=4096),
tls.Epoch.HANDSHAKE: Buffer(capacity=4096),
tls.Epoch.ONE_RTT: Buffer(capacity=4096),
}