Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create(app, borg_json_output, mocker, qtbot):
main = app.main_window
stdout, stderr = borg_json_output('create')
popen_result = mocker.MagicMock(stdout=stdout, stderr=stderr, returncode=0)
mocker.patch.object(vorta.borg.borg_thread, 'Popen', return_value=popen_result)
qtbot.mouseClick(main.createStartBtn, QtCore.Qt.LeftButton)
qtbot.waitUntil(lambda: main.createProgressText.text().startswith('Backup finished.'), timeout=3000)
qtbot.waitUntil(lambda: main.createStartBtn.isEnabled(), timeout=3000)
assert EventLogModel.select().count() == 1
assert ArchiveModel.select().count() == 2
assert RepoModel.get(id=1).unique_size == 15520474
assert main.createStartBtn.isEnabled()
assert main.archiveTab.archiveTable.rowCount() == 2
assert main.scheduleTab.logTableWidget.rowCount() == 1
def test_repo_list(app, qtbot, mocker, borg_json_output):
main = app.main_window
tab = main.archiveTab
main.tabWidget.setCurrentIndex(3)
tab.list_action()
assert not tab.checkButton.isEnabled()
stdout, stderr = borg_json_output('list')
popen_result = mocker.MagicMock(stdout=stdout, stderr=stderr, returncode=0)
mocker.patch.object(vorta.borg.borg_thread, 'Popen', return_value=popen_result)
qtbot.waitUntil(lambda: main.createProgressText.text() == 'Refreshing archives done.', timeout=3000)
assert ArchiveModel.select().count() == 6
assert main.createProgressText.text() == 'Refreshing archives done.'
assert tab.checkButton.isEnabled()
def process_result(self, result):
if result['returncode'] == 0:
repo, created = RepoModel.get_or_create(url=result['cmd'][-1])
if not result['data']:
result['data'] = {} # TODO: Workaround for tests. Can't read mock results 2x.
remote_archives = result['data'].get('archives', [])
# Delete archives that don't exist on the remote side
for archive in ArchiveModel.select().where(ArchiveModel.repo == repo.id):
if not list(filter(lambda s: s['id'] == archive.snapshot_id, remote_archives)):
archive.delete_instance()
# Add remote archives we don't have locally.
for archive in result['data'].get('archives', []):
new_archive, _ = ArchiveModel.get_or_create(
snapshot_id=archive['id'],
repo=repo.id,
defaults={
'name': archive['name'],
'time': parser.parse(archive['time'])
}
)
new_archive.save()