Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, crypto_backend=default_backend()):
self.crypto_backend = crypto_backend
self.token_type = 'JWT'
self.signing_algorithm = 'ES256K'
self.signing_function = ec.ECDSA(hashes.SHA256())
def get_by_key_length(self, key_length):
for curve in self.ecdsa_curves:
if curve.key_length == key_length:
return curve
class ECDSAKey(PKey):
"""
Representation of an ECDSA key which can be used to sign and verify SSH2
data.
"""
_ECDSA_CURVES = _ECDSACurveSet(
[
_ECDSACurve(ec.SECP256R1, "nistp256"),
_ECDSACurve(ec.SECP384R1, "nistp384"),
_ECDSACurve(ec.SECP521R1, "nistp521"),
]
)
def __init__(
self,
msg=None,
data=None,
filename=None,
password=None,
vals=None,
file_obj=None,
validate_point=True,
):
self.verifying_key = None
def sign(pri_key_string, raw_data, password=None):
try:
password = examine_password(password)
loaded_private_key = serialization.load_pem_private_key(
pri_key_string,
password=password,
backend=default_backend()
)
signature_string = loaded_private_key.sign(
raw_data,
ec.ECDSA(hashes.SHA256()))
return Encoder.bytes_to_base64_str(signature_string)
except Exception:
logger.exception("pem sign error")
return None
:type digestAlgorithm: int from DigestAlgorithm
:return: The signature Blob, or an isNull Blob if this private key is
not initialized.
:rtype: Blob
:raises TpmPrivateKey.Error: For unrecognized digestAlgorithm or an
error in signing.
"""
if digestAlgorithm != DigestAlgorithm.SHA256:
raise TpmPrivateKey.Error(
"TpmPrivateKey.sign: Unsupported digest algorithm")
if self._keyType == KeyType.RSA:
signature = self._privateKey.sign(
data, padding.PKCS1v15(), hashes.SHA256())
elif self._keyType == KeyType.EC:
signature = self._privateKey.sign(data, ec.ECDSA(hashes.SHA256()))
else:
return Blob()
return Blob(bytearray(signature), False)
def get_ECDH_parameters():
"""Utility for initialization of ECDH parameters"""
global encryptors, decryptors
private_key = ec.generate_private_key(ec.SECP384R1(), default_backend())
serialized_pubkey = private_key.public_key().public_numbers().encode_point()
key_name = None
while key_name is None:
key_name = os.urandom(4)
if key_name in encryptors:
key_name = None
encryptors[key_name] = None
decryptors[key_name] = None
return private_key, serialized_pubkey, key_name
def hex_to_key(pub_key_hex):
pub_key_hex = pub_key_hex.strip()
pub_key_point = pub_key_hex.decode('hex')
public_numbers = ec.EllipticCurvePublicNumbers.from_encoded_point(ec.SECP256K1(), pub_key_point)
public_key = public_numbers.public_key(backend)
return public_key
def verify(public_key, data, signature):
"""
Verify a signature based on the original public key and data.
"""
deserialized_public_key = serialization.load_pem_public_key(
public_key.encode('utf-8'),
default_backend()
)
(r, s) = signature
try:
deserialized_public_key.verify(
encode_dss_signature(r, s),
json.dumps(data).encode('utf-8'),
ec.ECDSA(hashes.SHA256())
)
return True
except InvalidSignature:
return False
# Rsa public key.
return Jwk(key_type, parsed_key.get("kid", ""), algorithm, None, None,
rsa_pub_numbers.public_key(backends.default_backend()))
elif key_type == "EC":
if parsed_key["crv"] == "P-256":
curve = ec.SECP256R1()
elif parsed_key["crv"] == "P-384":
curve = ec.SECP384R1()
elif parsed_key["crv"] == "P-521":
curve = ec.SECP521R1()
else:
raise exceptions.UnsupportedAlgorithm(
"Unknown curve: %s" % (parsed_key["crv"]))
if parsed_key.get("d", None) is not None:
# Ecdsa private key.
priv_key = ec.derive_private_key(
jwsutil.b64_to_int(parsed_key["d"]), curve,
backends.default_backend())
return Jwk(key_type, parsed_key.get("kid", ""), algorithm, None,
priv_key, priv_key.public_key())
else:
# Ecdsa public key.
ec_pub_numbers = ec.EllipticCurvePublicNumbers(
jwsutil.b64_to_int(parsed_key["x"]),
jwsutil.b64_to_int(parsed_key["y"]), curve)
pub_key = ec_pub_numbers.public_key(backends.default_backend())
return Jwk(key_type, parsed_key.get("kid", ""), algorithm, None, None,
pub_key)
elif key_type == "oct":
sym_key = jwsutil.urlsafe_b64decode(parsed_key["k"])
return Jwk(key_type, parsed_key.get("kid", ""), algorithm, sym_key)
else:
:return: None
'''
if self.config['key']['key_type'] == 'rsa':
self.key = rsa.generate_private_key(
public_exponent=65537,
key_size=self.config['key']['key_peram'],
backend=default_backend()
)
elif self.config['key']['key_type'] == 'dsa':
self.key = dsa.generate_private_key(
key_size=self.config['key']['key_peram'],
backend=default_backend()
)
elif self.config['key']['key_type'] == 'ec':
self.key = ec.generate_private_key(
curve=getattr(ec, self.config['key']['key_peram'])(),
backend=default_backend()
)
else:
raise ValueError("\nFailed to generate key, no key_type key type provided!\n"
"Offending key name: {}\n".format(self.name))
def hextokeys(private_key_hex, public_key_hex):
s = int(private_key_hex, 16)
x = int(public_key_hex[2:66], 16)
y = int(public_key_hex[66:], 16)
keynums = ec.EllipticCurvePrivateNumbers(s, ec.EllipticCurvePublicNumbers(x, y, ec.SECP256R1()))
private_key = keynums.private_key(default_backend())
return (private_key, private_key.public_key())