Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Returns:
r_bytes, s_bytes: ecdsa signature R, S in bytes
Raises:
BootstrapError: if the gateway cannot be properly loaded
"""
try:
challenge_key = cert_utils.load_key(self._challenge_key_file)
except (IOError, ValueError, TypeError) as e:
raise BootstrapError(
'Gateway does not have a proper challenge key: %s' % e,
)
try:
signature = challenge_key.sign(
challenge, ec.ECDSA(hashes.SHA256()),
)
except TypeError:
raise BootstrapError(
'Challenge key cannot be used for ECDSA signature',
)
r_int, s_int = decode_dss_signature(signature)
r_bytes = r_int.to_bytes((r_int.bit_length() + 7) // 8, 'big')
s_bytes = s_int.to_bytes((s_int.bit_length() + 7) // 8, 'big')
return r_bytes, s_bytes
self._nonce = nonce
nonce = utils.read_only_property("_nonce")
def validate_for_algorithm(self, algorithm):
_check_aes_key_length(self, algorithm)
if len(self.nonce) * 8 != algorithm.block_size:
raise ValueError("Invalid nonce size ({0}) for {1}.".format(
len(self.nonce), self.name
))
@utils.register_interface(Mode)
@utils.register_interface(ModeWithInitializationVector)
@utils.register_interface(ModeWithAuthenticationTag)
class GCM(object):
name = "GCM"
_MAX_ENCRYPTED_BYTES = (2 ** 39 - 256) // 8
_MAX_AAD_BYTES = (2 ** 64) // 8
def __init__(self, initialization_vector, tag=None, min_tag_length=16):
# len(initialization_vector) must in [1, 2 ** 64), but it's impossible
# to actually construct a bytes object that large, so we don't check
# for it
if not isinstance(initialization_vector, bytes):
raise TypeError("initialization_vector must be bytes")
self._initialization_vector = initialization_vector
if tag is not None:
if not isinstance(tag, bytes):
raise TypeError("tag must be bytes or None")
if min_tag_length < 4:
# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import absolute_import, division, print_function
from cryptography import utils
from cryptography.exceptions import InvalidTag, UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.primitives import ciphers
from cryptography.hazmat.primitives.ciphers import modes
@utils.register_interface(ciphers.CipherContext)
@utils.register_interface(ciphers.AEADCipherContext)
@utils.register_interface(ciphers.AEADEncryptionContext)
@utils.register_interface(ciphers.AEADDecryptionContext)
class _CipherContext(object):
_ENCRYPT = 1
_DECRYPT = 0
def __init__(self, backend, cipher, mode, operation):
self._backend = backend
self._cipher = cipher
self._mode = mode
self._operation = operation
self._tag = None
if isinstance(self._cipher, ciphers.BlockCipherAlgorithm):
self._block_size_bytes = self._cipher.block_size // 8
)
@pytest.mark.supported(
only_if=lambda backend: backend.hash_supported(hashes.MD5()),
skip_message="Does not support MD5",
)
@pytest.mark.requires_backend_interface(interface=HashBackend)
class TestMD5(object):
test_md5 = generate_hash_test(
load_hash_vectors,
os.path.join("hashes", "MD5"),
[
"rfc-1321.txt",
],
hashes.MD5(),
)
@pytest.mark.supported(
only_if=lambda backend: backend.hash_supported(
hashes.BLAKE2b(digest_size=64)),
skip_message="Does not support BLAKE2b",
)
@pytest.mark.requires_backend_interface(interface=HashBackend)
class TestBLAKE2b(object):
test_b2b = generate_hash_test(
load_hash_vectors,
os.path.join("hashes", "blake2"),
[
"blake2b.txt",
],
with pytest.raises(AlreadyFinalized):
h.finalize()
def test_unsupported_hash(self, backend):
with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_HASH):
hashes.Hash(DummyHashAlgorithm(), backend)
@pytest.mark.supported(
only_if=lambda backend: backend.hash_supported(hashes.SHA1()),
skip_message="Does not support SHA1",
)
@pytest.mark.requires_backend_interface(interface=HashBackend)
class TestSHA1(object):
test_sha1 = generate_base_hash_test(
hashes.SHA1(),
digest_size=20,
)
@pytest.mark.supported(
only_if=lambda backend: backend.hash_supported(hashes.SHA224()),
skip_message="Does not support SHA224",
)
@pytest.mark.requires_backend_interface(interface=HashBackend)
class TestSHA224(object):
test_sha224 = generate_base_hash_test(
hashes.SHA224(),
digest_size=28,
)
def test_invalid_backend():
pretend_backend = object()
with raises_unsupported_algorithm(_Reasons.BACKEND_MISSING_INTERFACE):
HKDF(hashes.SHA256(), 16, None, None, pretend_backend)
with raises_unsupported_algorithm(_Reasons.BACKEND_MISSING_INTERFACE):
HKDFExpand(hashes.SHA256(), 16, None, pretend_backend)
def test_subject_alt_names(self, backend):
private_key = RSA_KEY_2048.private_key(backend)
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u"SAN"),
])
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(u"example.com"),
x509.DNSName(u"*.example.com"),
x509.RegisteredID(x509.ObjectIdentifier("1.2.3.4.5.6.7")),
x509.DirectoryName(x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, u'PyCA'),
x509.NameAttribute(
NameOID.ORGANIZATION_NAME, u'We heart UTF8!\u2122'
)
])),
x509.IPAddress(ipaddress.ip_address(u"127.0.0.1")),
x509.IPAddress(ipaddress.ip_address(u"ff::")),
x509.OtherName(
type_id=x509.ObjectIdentifier("1.2.3.3.3.3"),
value=b"0\x03\x02\x01\x05"
),
x509.RFC822Name(u"test@example.com"),
x509.RFC822Name(u"email"),
x509.RFC822Name(u"email@em\xe5\xefl.com"),
x509.UniformResourceIdentifier(
def create_client_cert() -> bytes:
ca_key = gen_private_key()
ca_cert = gen_certificate(ca_key, "certificate_authority")
client_key = gen_private_key()
client_cert = gen_certificate(
client_key, "client", issuer="certificate_authority", sign_key=ca_key
)
return client_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
) + client_cert.public_bytes(encoding=serialization.Encoding.PEM)
def test_prehashed_sign(self, backend):
private_key = RSA_KEY_512.private_key(backend)
message = b"one little message"
h = hashes.Hash(hashes.SHA1(), backend)
h.update(message)
digest = h.finalize()
pss = padding.PSS(mgf=padding.MGF1(hashes.SHA1()), salt_length=0)
prehashed_alg = asym_utils.Prehashed(hashes.SHA1())
signature = private_key.sign(digest, pss, prehashed_alg)
public_key = private_key.public_key()
public_key.verify(signature, message, pss, hashes.SHA1())