Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fernet_and_key():
key = Fernet.generate_key()
fernet = Fernet(key)
return fernet, base64.urlsafe_b64decode(key)
def test_rotate(self, backend):
f1 = Fernet(base64.urlsafe_b64encode(b"\x00" * 32), backend=backend)
f2 = Fernet(base64.urlsafe_b64encode(b"\x01" * 32), backend=backend)
mf1 = MultiFernet([f1])
mf2 = MultiFernet([f2, f1])
plaintext = b"abc"
mf1_ciphertext = mf1.encrypt(plaintext)
assert mf2.decrypt(mf1_ciphertext) == plaintext
rotated = mf2.rotate(mf1_ciphertext)
assert rotated != mf1_ciphertext
assert mf2.decrypt(rotated) == plaintext
with pytest.raises(InvalidToken):
mf1.decrypt(rotated)
def reencrypt_variables_connections(old_fernet_key_str, new_fernet_key_str):
old_fernet_key = fernet.Fernet(old_fernet_key_str.encode("utf-8"))
new_fernet_key = fernet.Fernet(new_fernet_key_str.encode("utf-8"))
db = connector.connect(
host="127.0.0.1",
user="root",
database="airflow-db",
)
variable_cursor = db.cursor()
variable_cursor.execute("SELECT id, val, is_encrypted FROM variable")
rows = variable_cursor.fetchall()
for row in rows:
id = row[0]
val = row[1]
is_encrypted = row[2]
if is_encrypted:
updated_val = new_fernet_key.encrypt(
old_fernet_key.decrypt(bytes(val))).decode()
makedirs(key_dir)
h = hashlib.new("ripemd160")
selected_account = account.upper() if account is not None else config.account.upper()
if sys.version_info[0] >= 3:
account_bytes = bytes(selected_account, CallerLookupKeys.UTF8)
else:
account_bytes = bytes(selected_account)
h.update(account_bytes)
key_path = join(key_dir, ".{0}".format(h.hexdigest()))
system_key, system_key_str = __get_system_key(selected_account)
log_debug(config, "SYSTEM_KEY", system_key_str, system_key)
f = Fernet(key=system_key)
if not isfile(key_path):
if not isdir(key_dir):
makedirs(key_dir)
key = Fernet.generate_key()
with open(key_path, "w") as file:
encoded = b64encode(key)
file.write(f.encrypt(encoded).decode(CallerLookupKeys.UTF8))
log_debug(config, "CRYPTO_KEY_CREATED", selected_account, key_path)
if osname == 'nt':
try:
import ctypes
ctypes.windll.kernel32.SetFileAttributesW(key_path, 0x02)
except:
ignore = True
with open(key_path, "r") as file:
if sys.version_info[0] >= 3:
data = bytes(file.read(), encoding=CallerLookupKeys.UTF8)
else:
data = bytes(file.read())
try:
def get_fernet():
"""Return a fernet object. Object is encoded on module level.
Fernet object gets a long time to create so we are creating it only once
per runtime.
"""
if _FERNET_KEY not in _CACHE:
try:
key = six.b(getattr(settings, 'DJANGO_NUMERICS_SECRET_KEY', ''))
if not key:
# same error if key is not hexedecimal.
# that way 2 errors will be handled in same place.
raise binascii.Error
_CACHE[_FERNET_KEY] = Fernet(key)
except binascii.Error:
raise ImproperlyConfigured('DJANGO_NUMERICS_SECRET_KEY must be a '
'hexedecimal value. Here is one that '
'is randomly generated for you ;) '
'{}'.format(Fernet.generate_key()))
return _CACHE[_FERNET_KEY]
def cache_encryption_key(self):
"""TODO
"""
if self.encryption_bytestring_key:
self.encryption_key = Fernet(self.encryption_bytestring_key)
else:
self.encryption_key = None
def cryptography_decrypt_v1(value, encryption_key=None):
encryption_key = get_valid_encryption_key(encryption_key, fix_length=True)
encoded_key = base64.b64encode(encryption_key.encode('utf-8'))
sym = fernet.Fernet(encoded_key)
try:
return sym.decrypt(encodeutils.safe_encode(value))
except fernet.InvalidToken:
raise exception.InvalidEncryptionKey()
def __init__(self, cookie_name='vibora', secret_key=None):
super().__init__(cookie_name=cookie_name)
self.cipher = Fernet(secret_key)
def decrypt_password(key, password_encrypted):
cipher_suite = Fernet(key)
password_decoded = cipher_suite.decrypt(password_encrypted)
return password_decoded
command = ['pkcs15-crypt', '--decipher', '--raw', '--pkcs', '--input', fname.name]
if card_slot is not None:
command.extend(['--reader', str(card_slot)])
command.extend(['--pin', '-'])
proc = Popen(command, stdout=PIPE, stdin=PIPE, stderr=DEVNULL)
stdout, _ = proc.communicate(input=passphrase.encode('UTF-8'))
os.unlink(fname.name)
try:
plaintext_derived_key = stdout
except IndexError:
raise DecryptionError(stdout)
if proc.returncode != 0:
raise DecryptionError(stdout)
fern = Fernet(plaintext_derived_key)
plaintext_string = fern.decrypt(handle_python_strings(ciphertext_string))
return plaintext_string.decode("UTF-8")