Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __str__(self):
return self.description
class APIError(Error, _API.Error):
def __init__(self, code, name, description):
_API.Error.__init__(self, code, name, description)
class NotSubscribed(Error):
DESC = """\
Backups are not yet enabled for your TurnKey Hub account. Log
into the Hub and go to the "Backups" section for instructions."""
def __init__(self, desc=DESC):
Error.__init__(self, desc)
class InvalidBackupError(Error):
pass
class API(_API):
def request(self, method, url, attrs={}, headers={}):
try:
return _API.request(self, method, url, attrs, headers)
except self.Error, e:
if e.name == "BackupRecord.NotFound":
raise InvalidBackupError(e.description)
if e.name in ("BackupAccount.NotSubscribed",
"BackupAccount.NotFound"):
raise NotSubscribed()
raise APIError(e.code, e.name, e.description)
def __init__(self, apikey):
apikey = str(apikey)
self.encoded = apikey
padded = "A" * (20 - len(apikey)) + apikey
try:
uid, secret = struct.unpack("!L8s", base64.b32decode(padded + "=" * 4))
except TypeError:
raise Error("Invalid characters in API-KEY")
self.uid = uid
self.secret = secret
@classmethod
def get_sub_apikey(cls, apikey):
"""Check that APIKey is valid and return subkey"""
apikey = APIKey(apikey)
user = dummydb.get_user(apikey.uid)
if not user or user.apikey != apikey:
raise Error("invalid APIKey: %s" % apikey)
return apikey.subkey(cls.SUBKEY_NS)
if isinstance(subcls, type) and issubclass(subcls, BaseCredentials))
creds_type = d.get('type')
kwargs = d.copy()
try:
del kwargs['type']
except KeyError:
pass
# implicit devpay if no type (backwards compat with existing registry)
if not creds_type:
return cls.DevPay(**kwargs)
if creds_type not in creds_types:
raise Error('unknown credentials type "%s"' % creds_type)
return(creds_types[creds_type](**kwargs))
passphrase = get_passphrase()
key = keypacket.fmt(registry.secret, passphrase)
hbr = registry.hbr
# after we setup a backup record
# only save key to registry if update_key works
if hbr:
try:
hb.update_key(hbr.backup_id, key)
registry.key = key
print ("Updated" if passphrase else "Removed") + \
" passphrase - uploaded key to Hub."
except hub.Error:
raise
else:
registry.key = key
except KeyError:
pass
# implicit devpay if no type (backwards compat with existing registry)
if not creds_type:
return cls.DevPay(**kwargs)
if creds_type not in creds_types:
raise Error('unknown credentials type "%s"' % creds_type)
return(creds_types[creds_type](**kwargs))
class Backups:
API_URL = os.getenv('TKLBAM_APIURL', 'https://hub.turnkeylinux.org/api/backup/')
Error = Error
class NotInitialized(Error):
pass
def __init__(self, subkey=None):
if subkey is None:
raise self.NotInitialized("no APIKEY - tklbam not linked to the Hub")
self.subkey = subkey
self.api = API()
def _api(self, method, uri, attrs={}):
headers = { 'subkey': str(self.subkey) }
return self.api.request(method, self.API_URL + uri, attrs, headers)
@classmethod
def get_sub_apikey(cls, apikey):
response = API().request('GET', cls.API_URL + 'subkey/', {'apikey': apikey})
def get_new_profile(self, profile_id, profile_timestamp):
"""
Gets a profile for that is newer than .
If there's a new profile, returns a DummyProfileArchive instance.
Otherwise returns None.
Raises an exception if no profile exists for profile_id.
"""
if not self.user.credentials:
raise NotSubscribed()
archive = dummydb.get_profile(profile_id)
if not archive:
raise Error(404, 'BackupArchive.NotFound', 'Backup profile archive not found: ' + profile_id)
archive_timestamp = int(os.stat(archive).st_mtime)
if profile_timestamp and profile_timestamp >= archive_timestamp:
return None
return DummyProfileArchive(profile_id, archive, archive_timestamp)
def __init__(self, desc=DESC):
Error.__init__(self, desc)
dummydb = _DummyDB("/var/tmp/tklbam/dummyhub")
class DummyProfileArchive(ProfileArchive):
def __del__(self):
pass
class Backups:
# For simplicity's sake this implements a dummy version of both
# client-side and server-side operations.
#
# When translating to a real implementation the interface should remain
# but the implementation will change completely as only client-side
# operations remain.
Error = Error
class NotInitialized(Error):
pass
SUBKEY_NS = "tklbam"
@classmethod
def get_sub_apikey(cls, apikey):
"""Check that APIKey is valid and return subkey"""
apikey = APIKey(apikey)
user = dummydb.get_user(apikey.uid)
if not user or user.apikey != apikey:
raise Error("invalid APIKey: %s" % apikey)
return apikey.subkey(cls.SUBKEY_NS)
def __init__(self, subkey):
import tempfile
from datetime import datetime
import executil
from pycurl_wrapper import API as _API
from utils import AttrDict
class Error(Exception):
def __init__(self, description, *args):
Exception.__init__(self, description, *args)
self.description = description
def __str__(self):
return self.description
class APIError(Error, _API.Error):
def __init__(self, code, name, description):
_API.Error.__init__(self, code, name, description)
class NotSubscribed(Error):
DESC = """\
Backups are not yet enabled for your TurnKey Hub account. Log
into the Hub and go to the "Backups" section for instructions."""
def __init__(self, desc=DESC):
Error.__init__(self, desc)
class InvalidBackupError(Error):
pass
class API(_API):
def request(self, method, url, attrs={}, headers={}):