Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read_encrypted(self) -> tink_pb2.EncryptedKeyset:
try:
return json_format.Parse(self._serialized_keyset,
tink_pb2.EncryptedKeyset())
except json_format.ParseError as e:
raise tink_error.TinkError(e)
def read_encrypted(self) -> tink_pb2.EncryptedKeyset:
if not self._serialized_keyset:
raise tink_error.TinkError('No keyset found')
try:
encrypted_keyset = tink_pb2.EncryptedKeyset()
encrypted_keyset.ParseFromString(self._serialized_keyset)
return encrypted_keyset
except message.DecodeError as e:
raise tink_error.TinkError(e)
def write_encrypted(self, encrypted_keyset: tink_pb2.EncryptedKeyset) -> None:
if not isinstance(encrypted_keyset, tink_pb2.EncryptedKeyset):
raise tink_error.TinkError('invalid encrypted keyset.')
self._io_stream.write(encrypted_keyset.SerializeToString())
self._io_stream.flush()
def _encrypt(keyset: tink_pb2.Keyset,
master_key_primitive: aead.Aead) -> tink_pb2.EncryptedKeyset:
"""Encrypts a Keyset and returns an EncryptedKeyset."""
encrypted_keyset = master_key_primitive.encrypt(keyset.SerializeToString(),
b'')
# Check if we can decrypt, to detect errors
try:
keyset2 = tink_pb2.Keyset.FromString(
master_key_primitive.decrypt(encrypted_keyset, b''))
if keyset != keyset2:
raise tink_error.TinkError('cannot encrypt keyset: %s != %s' %
(keyset, keyset2))
except message.DecodeError:
raise tink_error.TinkError('invalid keyset, corrupted key material')
return tink_pb2.EncryptedKeyset(
encrypted_keyset=encrypted_keyset, keyset_info=_keyset_info(keyset))