Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test(signature_source):
# TODO replace me!
data = signature_source.encode(encoding="utf-8")
print(type(data))
private_key = ec.generate_private_key(
ec.SECP256K1(), default_backend()
)
serialized_private = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(b'cpchain@2018')
# encryption_algorithm=serialization.NoEncryption,
)
print("private key:")
pls=serialized_private.splitlines()
for p in pls:
print(p.decode("utf-8"))
signature = private_key.sign(
data, ec.ECDSA(hashes.SHA256()))
print("hex sign:" + ByteToHex(signature))
def fqdns_from_certificate(cert_data):
try:
cert = x509.load_pem_x509_certificate(cert_data, default_backend())
except ValueError:
pass
try:
cert = x509.load_der_x509_certificate(cert_data, default_backend())
except ValueError:
raise ValueError("No recognized cert format. Allowed: PEM or DER")
names = set()
names.add(cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value.lower().rstrip('.'))
try:
alt_names = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
except x509.extensions.ExtensionNotFound:
alt_names = None
if alt_names:
for alt_name in alt_names.value.get_values_for_type(x509.DNSName):
names.add(alt_name.lower().rstrip('.'))
return list(sorted(names))
def load_key(pubkey):
"""Load public RSA key, with work-around for keys using
incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def get_snowflake_client():
kms = boto3.client('kms')
password = kms.decrypt(
CiphertextBlob=base64.b64decode(os.environ['private_key_password'])
)['Plaintext'].decode()
private_key = serialization.load_pem_private_key(
base64.b64decode(os.environ['private_key']),
password=password.encode(),
backend=default_backend(),
)
pkb = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
ctx = snowflake.connector.connect(
user=os.environ['snowflake_user'],
account=os.environ['snowflake_account'],
private_key=pkb,
)
return ctx
def from_encoded_point(cls, algorithm, encoded_point):
"""Creates a Verifier object based on the supplied algorithm and encoded compressed ECC curve point.
:param algorithm: Algorithm on which to base verifier
:type algorithm: aws_encryption_sdk.identifiers.Algorithm
:param bytes encoded_point: ECC public point compressed and encoded with _ecc_encode_compressed_point
:returns: Instance of Verifier generated from encoded point
:rtype: aws_encryption_sdk.internal.crypto.Verifier
"""
return cls(
algorithm=algorithm,
key=_ecc_public_numbers_from_compressed_point(
curve=algorithm.signing_algorithm_info(),
compressed_point=base64.b64decode(encoded_point)
).public_key(default_backend())
)
ValueError: if the initialization_vector is required and not set.
"""
if (cipher_mode != definitions.ENCRYPTION_MODE_ECB and
not initialization_vector):
raise ValueError('Missing initialization vector.')
if cipher_mode == definitions.ENCRYPTION_MODE_CBC:
mode = modes.CBC(initialization_vector)
elif cipher_mode == definitions.ENCRYPTION_MODE_CFB:
mode = modes.CFB(initialization_vector)
elif cipher_mode == definitions.ENCRYPTION_MODE_ECB:
mode = modes.ECB()
elif cipher_mode == definitions.ENCRYPTION_MODE_OFB:
mode = modes.OFB(initialization_vector)
backend = backends.default_backend()
cipher = ciphers.Cipher(algorithm, mode=mode, backend=backend)
super(CryptographyBlockCipherDecrypter, self).__init__()
self._algorithm = algorithm
self._cipher_context = cipher.decryptor()
def __init__(self, key=None, **kwargs):
"""Initializes a decrypter.
Args:
key (Optional[bytes]): key.
kwargs (dict): keyword arguments depending on the decrypter.
Raises:
ValueError: when key is not set.
"""
if not key:
raise ValueError('Missing key.')
algorithm = algorithms.ARC4(key)
backend = backends.default_backend()
cipher = ciphers.Cipher(algorithm, mode=None, backend=backend)
super(RC4Decrypter, self).__init__(**kwargs)
self._cipher_context = cipher.decryptor()
def load_key(pubkey):
"""Load public RSA key.
Work around keys with incorrect header/footer format.
Read more about RSA encryption with cryptography:
https://cryptography.io/latest/hazmat/primitives/asymmetric/rsa/
"""
try:
return load_pem_public_key(pubkey.encode(), default_backend())
except ValueError:
# workaround for https://github.com/travis-ci/travis-api/issues/196
pubkey = pubkey.replace('BEGIN RSA', 'BEGIN').replace('END RSA', 'END')
return load_pem_public_key(pubkey.encode(), default_backend())
def hmac_hash(self, key, data):
hmac = HMAC(key=key, algorithm=hashes.BLAKE2b(64), backend=default_backend())
hmac.update(data=data)
return hmac.finalize()
def issue_certificate(cn, ca_cert, ca_key,
organizations=(),
san_dns=(),
san_ips=(),
key_size=2048,
certify_days=365,
is_web_server=False,
is_web_client=False):
ca_cert = x509.load_pem_x509_certificate(ca_cert, default_backend())
ca_key = serialization.load_pem_private_key(ca_key, password=None, backend=default_backend())
ca_key_id = x509.SubjectKeyIdentifier.from_public_key(ca_key.public_key())
key = rsa.generate_private_key(public_exponent=65537, key_size=key_size, backend=default_backend())
subject_name_attributes = [x509.NameAttribute(NameOID.COMMON_NAME, cn)]
subject_name_attributes += [x509.NameAttribute(NameOID.ORGANIZATION_NAME, org) for org in organizations]
subject = x509.Name(subject_name_attributes)
now = datetime.datetime.utcnow()
cert = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(ca_cert.issuer) \
.public_key(key.public_key()) \
.serial_number(x509.random_serial_number()) \
.not_valid_before(now) \
.not_valid_after(now + datetime.timedelta(days=certify_days)) \
.add_extension(x509.AuthorityKeyIdentifier(ca_key_id.digest,
[x509.DirectoryName(ca_cert.issuer)],
ca_cert.serial_number),