Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_error(self, MockHttpProvider, MockAuthProvider):
"""
Test that the error is thrown and can be correctly handled
"""
try:
response = HttpResponse(404, None, json.dumps({"error":{"code":"itemNotFound", "message":"The resource could not be found"}}))
assert False
except OneDriveError as e:
assert e.status_code == 404
assert e.code == ErrorCode.ItemNotFound
try:
response = HttpResponse(403, None, json.dumps({"error":{"code":"generalException", "message":"TestMessage", "innererror":{"code":"accessDenied", "message":"TestMessage", "innererror":{"code":"unauthenticated", "message":"TestMessage"}}}}))
assert False
except OneDriveError as e:
assert e.status_code == 403
assert e.code == ErrorCode.GeneralException
assert e.matches(ErrorCode.AccessDenied)
assert e.matches(ErrorCode.Unauthenticated)
assert not e.matches(ErrorCode.NotSupported)
def handle(self):
logging.info('Deleting remote item "%s".', self.rel_path)
item_request = self.get_item_request()
try:
od_api_helper.item_request_call(self.repo, item_request.delete)
self.repo.delete_item(self.item_name, self.parent_relpath, self.is_folder)
logging.info('Deleted remote item "%s".', self.rel_path)
return True
except (onedrivesdk.error.OneDriveError, OSError) as e:
logging.error('Error deleting item "%s": %s.', self.rel_path, e)
return False
def get_authenticator_and_drives(context, account_id):
# TODO: Ideally we should recursively get all drives because the API pages them.
authenticator = OneDriveAuthenticator()
try:
authenticator.load_session(key=od_api_session.get_keyring_key(account_id))
drives = authenticator.client.drives.get()
except (onedrivesdk.error.OneDriveError, RuntimeError) as e:
logging.error('Error loading session: %s. Try refreshing token.', e)
authenticator.refresh_session(account_id)
drives = authenticator.client.drives.get()
return authenticator, drives
def handle(self):
logging.info('Moving item "%s" to "%s".', self.rel_path, self.new_relpath)
# The routine assumes that the directory to save the new path exists remotely.
item_request = self.get_item_request()
try:
item_stat = os.stat(self.new_local_abspath)
item = item_request_call(self.repo, item_request.update, self._get_new_item())
# TODO: update all records or rebuild records after deletion?
# self.repo.delete_item(self.item_name, self.parent_relpath, self.is_folder)
self.repo.move_item(item_name=self.item_name, parent_relpath=self.parent_relpath,
new_name=self.new_name, new_parent_relpath=self.new_parent_relpath,
is_folder=self.is_folder)
self.update_timestamp_and_record(item, item_stat)
return True
except (onedrivesdk.error.OneDriveError, OSError) as e:
logging.error('Error moving item "%s" to "%s": %s.', self.rel_path, self.new_relpath, e)
return False
item_mtime, item_mtime_editable = get_item_modified_datetime(self.remote_item)
item_request_call(self.repo, item_request.download, tmp_path)
hashes = self.remote_item.file.hashes
if hashes is None or hashes.sha1_hash is None or hashes.sha1_hash == sha1_value(tmp_path):
item_size_local = os.path.getsize(tmp_path)
os.rename(tmp_path, self.local_abspath)
fix_owner_and_timestamp(self.local_abspath, self.repo.context.user_uid,
datetime_to_timestamp(item_mtime))
self.repo.update_item(self.remote_item, self.parent_relpath, item_size_local)
logging.info('Finished downloading item "%s".', self.remote_item.id)
return True
else:
# We assumed server's SHA-1 value is always correct -- might not be true.
logging.error('Hash mismatch for downloaded file "%s".', self.local_abspath)
os.remove(tmp_path)
except (onedrivesdk.error.OneDriveError, OSError) as e:
logging.error('Error when downloading file "%s": %s.', self.remote_item.id, e)
return False
def client(self):
if ONEDRIVE_CONNECTION.get('client', None) is None:
self.authenticate_onedrive()
try:
# See if existing client is configured with a valid token
from onedrivesdk.error import OneDriveError
ONEDRIVE_CONNECTION.get('client').item(id='root').get()
except OneDriveError:
self.authenticate_onedrive()
return ONEDRIVE_CONNECTION.get('client')
returned_item = item_request_call(self.repo, item_request.get)
else:
logging.info('Uploading large file "%s" in chunks of 10MB.', self.local_abspath)
item_request = self.repo.authenticator.client.item(drive=self.repo.drive.id, path=self.rel_path)
returned_item = item_request_call(self.repo, item_request.upload_async,
local_path=self.local_abspath, upload_status=self.update_progress)
if not isinstance(returned_item, onedrivesdk.Item):
if hasattr(returned_item, '_prop_dict'):
returned_item = onedrivesdk.Item(returned_item._prop_dict)
else:
returned_item = item_request_call(self.repo, item_request.get)
self.update_timestamp_and_record(returned_item, item_stat)
self.task_pool.release_path(self.local_abspath)
logging.info('Finished uploading file "%s".', self.local_abspath)
return True
except (onedrivesdk.error.OneDriveError, OSError) as e:
logging.error('Error uploading file "%s": %s.', self.local_abspath, e)
# TODO: what if quota is exceeded?
if (isinstance(e, onedrivesdk.error.OneDriveError) and
e.code == onedrivesdk.error.ErrorCode.MalwareDetected):
logging.warning('File "%s" was detected as malware by OneDrive. '
'Do not upload during program session.', self.local_abspath)
self.task_pool.occupy_path(self.local_abspath, None)
return False
self.task_pool.release_path(self.local_abspath)
return False
def item_request_call(repo, request_func, *args, **kwargs):
while True:
try:
return request_func(*args, **kwargs)
except onedrivesdk.error.OneDriveError as e:
logging.error('Encountered API Error: %s.', e)
if e.code == onedrivesdk.error.ErrorCode.ActivityLimitReached:
time.sleep(THROTTLE_PAUSE_SEC)
elif e.code == onedrivesdk.error.ErrorCode.Unauthenticated:
repo.authenticator.refresh_session(repo.account_id)
else:
raise e
except requests.ConnectionError as e:
logging.error('Encountered connection error: %s. Retry in %d sec.', e, THROTTLE_PAUSE_SEC)
time.sleep(THROTTLE_PAUSE_SEC)