Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
popen_result = mocker.MagicMock(stdout=stdout, stderr=stderr, returncode=0)
mocker.patch.object(vorta.borg.borg_thread, 'Popen', return_value=popen_result)
qtbot.mouseClick(add_repo_window.saveButton, QtCore.Qt.LeftButton)
with qtbot.waitSignal(add_repo_window.thread.result, timeout=3000) as blocker:
pass
main.repoTab.process_new_repo(blocker.args[0])
qtbot.waitUntil(lambda: EventLogModel.select().count() == 2)
assert EventLogModel.select().count() == 2
assert RepoModel.get(id=2).url == test_repo_url
from vorta.utils import keyring
assert keyring.get_password("vorta-repo", RepoModel.get(id=2).url) == LONG_PASSWORD
def test_create(app, borg_json_output, mocker, qtbot):
main = app.main_window
stdout, stderr = borg_json_output('create')
popen_result = mocker.MagicMock(stdout=stdout, stderr=stderr, returncode=0)
mocker.patch.object(vorta.borg.borg_thread, 'Popen', return_value=popen_result)
qtbot.mouseClick(main.createStartBtn, QtCore.Qt.LeftButton)
qtbot.waitUntil(lambda: main.createProgressText.text().startswith('Backup finished.'), timeout=3000)
qtbot.waitUntil(lambda: main.createStartBtn.isEnabled(), timeout=3000)
assert EventLogModel.select().count() == 1
assert ArchiveModel.select().count() == 2
assert RepoModel.get(id=1).unique_size == 15520474
assert main.createStartBtn.isEnabled()
assert main.archiveTab.archiveTable.rowCount() == 2
assert main.scheduleTab.logTableWidget.rowCount() == 1
def process_new_repo(self, result):
if result['returncode'] == 0:
new_repo = RepoModel.get(url=result['params']['repo_url'])
profile = self.profile()
profile.repo = new_repo.id
profile.save()
self.repoSelector.addItem(new_repo.url, new_repo.id)
self.repoSelector.setCurrentIndex(self.repoSelector.count() - 1)
self.repo_added.emit()
self.init_repo_stats()
def process_result(self, result):
if result['returncode'] in [0, 1] and 'archive' in result['data']:
new_archive, created = ArchiveModel.get_or_create(
snapshot_id=result['data']['archive']['id'],
defaults={
'name': result['data']['archive']['name'],
'time': parser.parse(result['data']['archive']['start']),
'repo': result['params']['repo_id'],
'duration': result['data']['archive']['duration'],
'size': result['data']['archive']['stats']['deduplicated_size']
}
)
new_archive.save()
if 'cache' in result['data'] and created:
stats = result['data']['cache']['stats']
repo = RepoModel.get(id=result['params']['repo_id'])
repo.total_size = stats['total_size']
repo.unique_csize = stats['unique_csize']
repo.unique_size = stats['unique_size']
repo.total_unique_chunks = stats['total_unique_chunks']
repo.save()
self.app.backup_log_event.emit(self.tr('Backup finished.'))
def repo_unlink_action(self):
profile = self.profile()
self.init_repo_stats()
msg = QMessageBox()
msg.setStandardButtons(QMessageBox.Ok)
msg.setParent(self, QtCore.Qt.Sheet)
selected_repo_id = self.repoSelector.currentData()
selected_repo_index = self.repoSelector.currentIndex()
if selected_repo_index > 2:
repo = RepoModel.get(id=selected_repo_id)
ArchiveModel.delete().where(ArchiveModel.repo_id == repo.id).execute()
profile.repo = None
profile.save()
repo.delete_instance(recursive=True) # This also deletes archives.
self.repoSelector.setCurrentIndex(0)
self.repoSelector.removeItem(selected_repo_index)
msg.setText(self.tr('Repository was Unlinked'))
msg.setInformativeText(self.tr('You can always connect it again later.'))
msg.exec_()
self.repo_changed.emit()
self.init_repo_stats()