Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_iri_with_port(self):
with open(os.path.join(fixtures_dir, 'admin.ch.crt'), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
self.assertEqual(
[dp.native for dp in cert.crl_distribution_points],
[
util.OrderedDict([
('distribution_point', ['http://www.pki.admin.ch/crl/SSLCA01.crl']),
('reasons', None),
('crl_issuer', None)
]),
util.OrderedDict([
(
'distribution_point',
[
'ldap://admindir.admin.ch:389/'
'cn=Swiss Government SSL CA 01,'
def test_build_paths(self):
with open(os.path.join(fixtures_dir, 'mozilla.org.crt'), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
with open(os.path.join(fixtures_dir, 'digicert-sha2-secure-server-ca.crt'), 'rb') as f:
other_certs = [f.read()]
repo = CertificateRegistry(other_certs=other_certs)
paths = repo.build_paths(cert)
self.assertEqual(1, len(paths))
path = paths[0]
self.assertEqual(3, len(path))
self.assertEqual(
[
b'\x80Q\x06\x012\xad\x9a\xc2}Q\x87\xa0\xe8\x87\xfb\x01b\x01U\xee',
b"\x10_\xa6z\x80\x08\x9d\xb5'\x9f5\xce\x83\x0bC\x88\x9e\xa3\xc7\r",
b'I\xac\x03\xf8\xf3Km\xca)V)\xf2I\x9a\x98\xbe\x98\xdc.\x81'
def _load_cert(self, relative_path):
with open(os.path.join(fixtures_dir, relative_path), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
return x509.Certificate.load(cert_bytes)
def test_build_paths_custom_ca_certs(self):
with open(os.path.join(fixtures_dir, 'mozilla.org.crt'), 'rb') as f:
cert_bytes = f.read()
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
cert = x509.Certificate.load(cert_bytes)
with open(os.path.join(fixtures_dir, 'digicert-sha2-secure-server-ca.crt'), 'rb') as f:
other_certs = [f.read()]
repo = CertificateRegistry(trust_roots=other_certs)
paths = repo.build_paths(cert)
self.assertEqual(1, len(paths))
path = paths[0]
self.assertEqual(2, len(path))
self.assertEqual(
[
b"\x10_\xa6z\x80\x08\x9d\xb5'\x9f5\xce\x83\x0bC\x88\x9e\xa3\xc7\r",
b'I\xac\x03\xf8\xf3Km\xca)V)\xf2I\x9a\x98\xbe\x98\xdc.\x81'
],
logger = logging.getLogger(__name__)
# https://securityblog.redhat.com/2014/06/18/openssl-privilege-separation-analysis/
# https://jamielinux.com/docs/openssl-certificate-authority/
# http://pycopia.googlecode.com/svn/trunk/net/pycopia/ssl/certs.py
# Cache CA certificate
with open(config.AUTHORITY_CERTIFICATE_PATH, "rb") as fh:
certificate_buf = fh.read()
header, _, certificate_der_bytes = pem.unarmor(certificate_buf)
certificate = x509.Certificate.load(certificate_der_bytes)
public_key = asymmetric.load_public_key(certificate["tbs_certificate"]["subject_public_key_info"])
with open(config.AUTHORITY_PRIVATE_KEY_PATH, "rb") as fh:
key_buf = fh.read()
header, _, key_der_bytes = pem.unarmor(key_buf)
private_key = asymmetric.load_private_key(key_der_bytes)
def self_enroll(skip_notify=False):
assert os.getuid() == 0 and os.getgid() == 0, "Can self-enroll only as root"
from certidude import const, config
common_name = const.FQDN
os.umask(0o0177)
try:
path, buf, cert, signed, expires = get_signed(common_name)
self_public_key = asymmetric.load_public_key(path)
private_key = asymmetric.load_private_key(config.SELF_KEY_PATH)
except FileNotFoundError: # certificate or private key not found
click.echo("Generating private key for frontend: %s" % config.SELF_KEY_PATH)
with open(config.SELF_KEY_PATH, 'wb') as fh:
:param validation_context:
A certvalidator.context.ValidationContext() object that controls
validation options
"""
if not isinstance(end_entity_cert, Certificate):
if not isinstance(end_entity_cert, byte_cls):
raise TypeError(pretty_message(
'''
end_entity_cert must be a byte string or an instance of
asn1crypto.x509.Certificate, not %s
''',
type_name(end_entity_cert)
))
if pem.detect(end_entity_cert):
_, _, end_entity_cert = pem.unarmor(end_entity_cert)
end_entity_cert = Certificate.load(end_entity_cert)
if validation_context is None:
validation_context = ValidationContext()
if not isinstance(validation_context, ValidationContext):
raise TypeError(pretty_message(
'''
validation_context must be an instance of
certvalidator.context.ValidationContext, not %s
''',
type_name(validation_context)
))
if intermediate_certs is not None:
certificate_registry = validation_context.certificate_registry
Attempt to renew certificate using currently valid key pair
"""
try:
path, buf, cert, signed, expires = self.authority.get_signed(common_name)
except EnvironmentError:
pass # No currently valid certificate for this common name
else:
cert_pk = cert["tbs_certificate"]["subject_public_key_info"].native
csr_pk = csr["certification_request_info"]["subject_pk_info"].native
# Same public key
if cert_pk == csr_pk:
buf = req.get_header("X-SSL-CERT")
if buf:
# Used mutually authenticated TLS handshake, assume renewal
header, _, der_bytes = pem.unarmor(buf.replace("\t", "\n").replace("\n\n", "\n").encode("ascii"))
handshake_cert = x509.Certificate.load(der_bytes)
if handshake_cert.native == cert.native:
for subnet in config.RENEWAL_SUBNETS:
if req.context.get("remote_addr") in subnet:
resp.set_header("Content-Type", "application/x-x509-user-cert")
setxattr(path, "user.revocation.reason", "superseded")
_, resp.body = self.authority._sign(csr, body, overwrite=True,
profile=SignatureProfile.from_cert(cert))
logger.info("Renewing certificate for %s as %s is whitelisted", common_name, req.context.get("remote_addr"))
return
reasons.append("renewal failed")
else:
# No renewal requested, redirect to signed API call
resp.status = falcon.HTTP_SEE_OTHER
resp.location = os.path.join(os.path.dirname(req.relative_uri), "signed", common_name)
return
:return:
A boolean indicating if the certificate was added - will return
False if the certificate was already present
"""
if not isinstance(cert, x509.Certificate):
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
cert must be a byte string or an instance of
asn1crypto.x509.Certificate, not %s
''',
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
cert = x509.Certificate.load(cert)
hashable = cert.subject.hashable
if hashable not in self._subject_map:
self._subject_map[hashable] = []
# Don't add the cert if we already have it
else:
serial_number = cert.serial_number
for existing_cert in self._subject_map[hashable]:
if existing_cert.serial_number == serial_number:
return False
self._subject_map[hashable].append(cert)
if cert.key_identifier:
self._key_identifier_map[cert.key_identifier] = cert
output = []
for cert in certs:
if isinstance(cert, x509.Certificate):
output.append(cert)
else:
if not isinstance(cert, byte_cls):
raise TypeError(pretty_message(
'''
%s must contain only byte strings or
asn1crypto.x509.Certificate objects, not %s
''',
var_name,
type_name(cert)
))
if pem.detect(cert):
_, _, cert = pem.unarmor(cert)
output.append(x509.Certificate.load(cert))
return output