Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for j in range(0,10):
r = requests.get("http://ca.example.lan/api/")
if r.status_code != 502:
break
sleep(1)
assert r.status_code == 401, "Timed out starting up the API backend"
# TODO: check that port 8080 is listening, otherwise app probably crashed
# Test CA certificate fetch
r = requests.get("http://ca.example.lan/api/certificate")
assert r.status_code == 200
assert r.headers.get('content-type') == "application/x-x509-ca-cert"
header, _, certificate_der_bytes = pem.unarmor(r.text.encode("ascii"))
cert = x509.Certificate.load(certificate_der_bytes)
assert cert.subject.native.get("common_name") == "Certidude at ca.example.lan"
assert cert.subject.native.get("organizational_unit_name") == "Certificate Authority"
assert cert.serial_number >= 0x150000000000000000000000000000
assert cert.serial_number <= 0xfffffffffffffffffffffffffffffffffffffff
assert cert["tbs_certificate"]["validity"]["not_before"].native.replace(tzinfo=None) < datetime.utcnow()
assert cert["tbs_certificate"]["validity"]["not_after"].native.replace(tzinfo=None) > datetime.utcnow() + timedelta(days=7000)
assert cert["tbs_certificate"]["validity"]["not_before"].native.replace(tzinfo=None) < datetime.utcnow()
extensions = cert["tbs_certificate"]["extensions"].native
assert extensions[0] == OrderedDict([
('extn_id', 'basic_constraints'),
('critical', True),
('extn_value', OrderedDict([
('ca', True),
('path_len_constraint', None)]
def get_certificates_v2(self):
"""
Return a list of :class:`asn1crypto.x509.Certificate` which are found
in the v2 signing block.
Note that we simply extract all certificates regardless of the signer.
Therefore this is just a list of all certificates found in all signers.
"""
return [ x509.Certificate.load(cert) for cert in self.get_certificates_der_v2()]
def cert2asn(cert):
cert_bytes = cert.public_bytes(serialization.Encoding.PEM)
if pem.detect(cert_bytes):
_, _, cert_bytes = pem.unarmor(cert_bytes)
return x509.Certificate.load(cert_bytes)
A (copied) list of 3-element tuples containing CA certs from the OS
trust ilst:
- 0: an asn1crypto.x509.Certificate object
- 1: a set of unicode strings of OIDs of trusted purposes
- 2: a set of unicode strings of OIDs of rejected purposes
"""
if not _in_memory_up_to_date(cache_length):
with memory_lock:
if not _in_memory_up_to_date(cache_length):
certs = []
for cert_bytes, trust_oids, reject_oids in extract_from_system(cert_callback):
if map_vendor_oids:
trust_oids = _map_oids(trust_oids)
reject_oids = _map_oids(reject_oids)
certs.append((Certificate.load(cert_bytes), trust_oids, reject_oids))
_module_values['certs'] = certs
_module_values['last_update'] = time.time()
return list(_module_values['certs'])
def wrapped(resource, req, resp, *args, **kwargs):
buf = req.get_header("X-SSL-CERT")
if not buf:
logger.info("No TLS certificate presented to access administrative API call")
raise falcon.HTTPForbidden("Forbidden", "Machine not authorized to perform the operation")
header, _, der_bytes = pem.unarmor(buf.replace("\t", "").encode("ascii"))
cert = x509.Certificate.load(der_bytes) # TODO: validate serial
for extension in cert["tbs_certificate"]["extensions"]:
if extension["extn_id"].native == "extended_key_usage":
if "server_auth" in extension["extn_value"].native:
req.context["machine"] = cert.subject.native["common_name"]
return func(resource, req, resp, *args, **kwargs)
logger.info("TLS authenticated machine '%s' not authorized to access administrative API", cert.subject.native["common_name"])
raise falcon.HTTPForbidden("Forbidden", "Machine not authorized to perform the operation")
return wrapped
if message_type == b'\x0b':
chain_bytes = message_data
break
if chain_bytes:
break
if chain_bytes:
# The first 3 bytes are the cert chain length
pointer = 3
while pointer < len(chain_bytes):
cert_length = int_from_bytes(chain_bytes[pointer:pointer + 3])
cert_start = pointer + 3
cert_end = cert_start + cert_length
pointer = cert_end
cert_bytes = chain_bytes[cert_start:cert_end]
output.append(Certificate.load(cert_bytes))
return output
def cert_to_merkle_tree_leaves(certBytes: bytes, issuerCertBytes: bytes) -> List[Tuple[bytes, bytes]]:
issuer = asn1crypto.x509.Certificate.load(issuerCertBytes)
cert = asn1crypto.x509.Certificate.load(certBytes)
issuer_key_hash = hashlib.sha256(issuer["tbs_certificate"]["subject_public_key_info"].dump()).digest()
tbsCert = cert["tbs_certificate"]
scts = []
for i in range(len(tbsCert["extensions"])):
if tbsCert["extensions"][i]["extn_id"].native == "signed_certificate_timestamp_list":
# Parse log id, timestamp, extensions out of extension
sct_list = ctl_parser_structures.SignedCertificateTimestampList.parse(
tbsCert["extensions"][i]["extn_value"].parsed.native)
for sct in sct_list.sct_list:
scts.append(sct.sct)
del tbsCert["extensions"][i]
break
tbsCertBytes = tbsCert.dump()
idx += 8 + i2
hash_type = hashDigestType.get(alg2, ('UNKNOWN', 0))
algs_for_zip.append((hash_type, s2))
# if alg_zip_best is None:
# if hash_type:
# alg_zip_best = (hash_type,s)
# else:
# print('unsupport alg', alg2)
# elif alg_zip_best[0] == 'SHA256' and hash_type== 'SHA512':
# alg_zip_best = (hash_type, s)
# else:
# pass
# print(s2)
for certificate in extract_list_by_int_prefix(certificates):
# c = decode(certificate,asn1Spec=Certificate())[0]
c = x509.Certificate.load(certificate)
mycert = cert(c)
cert_cache[mycert.fgprint] = mycert
if mycert.subjkey:
relation_cache[mycert.subjkey] = mycert.fgprint
if mycert.pub_key.dump() == public_key_info.dump():
verified_certs.append(mycert.fgprint)
else:
ret = 'v2PubkeyNotCert'
verified_chains = verify_chain(verified_certs, relation_cache, cert_cache)
all_certs_with_out_object = {}
for fgprint, mycert in cert_cache.items():
all_certs_with_out_object[mycert.fgprint] = mycert.dump()
return algs_for_zip, verified_chains, all_certs_with_out_object