Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _fetch_certs(hostname_file):
with open(hostname_file) as f:
hostnames = f.read().split('\n')
map_serial_to_name = {}
for h in hostnames:
if not h:
continue
connection = _openssl_connect(h, 443)
for cert_openssl in connection.get_peer_cert_chain():
cert_der = dump_certificate(FILETYPE_ASN1, cert_openssl)
cert = Certificate.load(cert_der)
map_serial_to_name[cert.serial_number] = cert.subject.native
return map_serial_to_name
data must be a byte string, not %s
''',
type_name(data)
))
if hash_algorithm not in set(['sha1', 'sha224', 'sha256', 'sha384', 'sha512']):
raise ValueError(pretty_message(
'''
hash_algorithm must be one of "sha1", "sha224", "sha256", "sha384",
"sha512", not %s
''',
repr(hash_algorithm)
))
asn1 = certificate_or_public_key.asn1
if isinstance(asn1, Certificate):
asn1 = asn1.public_key
curve_base_point = {
'secp256r1': SECP256R1_BASE_POINT,
'secp384r1': SECP384R1_BASE_POINT,
'secp521r1': SECP521R1_BASE_POINT,
}[curve_name]
x, y = asn1['public_key'].to_coords()
n = curve_base_point.order
# Validates that the point is valid
public_key_point = PrimePoint(curve_base_point.curve, x, y, n)
try:
signature = DSASignature.load(signature)
Parses binary data in :attr:`self.crt_data` and parses the content.
The server certificate a.k.a. *end_entity* is put in
:attr:`self.end_entity`, anything else that has a CA extension is added
to :attr:`self.intermediates`.
.. Note:: At this point it is not clear yet which of the intermediates
is the root and which are actual intermediates.
:raises CertParsingError: If the certificate file can't be read, it
contains errors or parts of the chain are missing.
"""
try:
pem_obj = asn1crypto.pem.unarmor(self.crt_data, multiple=True)
for type_name, _, der_bytes in pem_obj:
if type_name == 'CERTIFICATE':
crt = asn1crypto.x509.Certificate.load(der_bytes)
if getattr(crt, 'ca'):
LOG.debug("Found part of the chain..")
self.intermediates.append(crt)
else:
LOG.debug("Found the end entity..")
self.end_entity = crt
self.ocsp_urls = getattr(crt, 'ocsp_urls')
except (binascii.Error, ValueError):
raise CertParsingError(
"Certificate file contains errors \"{}\".".format(
self.filename
)
)
if len(self.intermediates) < 1:
raise CertParsingError(
"Can't find the CA certificate chain items in \"{}\".".format(
def prepend(self, cert):
"""
Prepends a cert to the path. This should be the issuer of the previously
prepended cert.
:param cert:
An asn1crypto.x509.Certificate object or a byte string
:return:
The current ValidationPath object, for chaining
"""
if not isinstance(cert, x509.Certificate):
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
cert must be a byte string or an
asn1crypto.x509.Certificate object, not %s
''',
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
cert = x509.Certificate.load(cert)
if cert.issuer_serial in self._cert_hashes:
raise DuplicateCertificateError()
self._cert_hashes.add(cert.issuer_serial)
Takes a list of byte strings or asn1crypto.x509.Certificates objects,
validates and loads them while unarmoring any PEM-encoded contents
:param certs:
A list of byte strings or asn1crypto.x509.Certificate objects
:param var_name:
A unicode variable name to use in any TypeError exceptions
:return:
A list of asn1crypto.x509.Certificate objects
"""
output = []
for cert in certs:
if isinstance(cert, x509.Certificate):
output.append(cert)
else:
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
%s must contain only byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
var_name,
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
output.append(x509.Certificate.load(cert))
return output
def append(self, cert):
"""
Appends a cert to the path. This should be a cert issued by the last
cert in the path.
:param cert:
An asn1crypto.x509.Certificate object
:return:
The current ValidationPath object, for chaining
"""
if not isinstance(cert, x509.Certificate):
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
cert must be a byte string or an
asn1crypto.x509.Certificate object, not %s
''',
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
cert = x509.Certificate.load(cert)
if cert.issuer_serial in self._cert_hashes:
raise DuplicateCertificateError()
self._cert_hashes.add(cert.issuer_serial)
Takes a list of byte strings or asn1crypto.x509.Certificates objects,
validates and loads them while unarmoring any PEM-encoded contents
:param certs:
A list of byte strings or asn1crypto.x509.Certificate objects
:param var_name:
A unicode variable name to use in any TypeError exceptions
:return:
A list of asn1crypto.x509.Certificate objects
"""
output = []
for cert in certs:
if isinstance(cert, x509.Certificate):
output.append(cert)
else:
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
%s must contain only byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
var_name,
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
output.append(x509.Certificate.load(cert))
return output
"""
Loads an x509 certificate into a Certificate object
:param source:
A byte string of file contents or a unicode string filename
:raises:
ValueError - when any of the parameters contain an invalid value
TypeError - when any of the parameters are of the wrong type
OSError - when an error is returned by the OS crypto library
:return:
A Certificate object
"""
if isinstance(source, x509.Certificate):
certificate = source
elif isinstance(source, byte_cls):
certificate = parse_certificate(source)
elif isinstance(source, str_cls):
with open(source, 'rb') as f:
certificate = parse_certificate(f.read())
else:
raise TypeError(pretty_message(
'''
source must be a byte string, unicode string or
asn1crypto.x509.Certificate object, not %s
''',
type_name(source)