Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init_bad_command(self, exit_mock, caplog):
# preparation
writer = self._mock_writer()
del writer.init_bad_command
output.init('bad_command')
# logging test
for record in caplog.records:
assert record.levelname == 'ERROR'
assert 'bad_command' in caplog.text
assert 'Traceback' in caplog.text
# writer test
writer.error_occurred.assert_called_once_with()
assert writer.exception.call_count == 1
# exit with error
assert exit_mock.called
assert exit_mock.call_count == 1
assert exit_mock.call_args[0] != 0
def test_result_check_ok_color(self, capsys, monkeypatch):
monkeypatch.setattr(output, 'ansi_colors_enabled', True)
writer = output.ConsoleOutputWriter()
output.error_occurred = False
server = 'test'
check = 'test check'
writer.result_check(server, check, True)
(out, err) = capsys.readouterr()
assert out == '\t%s: %sOK%s\n' % (check, GREEN, RESET)
assert err == ''
assert not output.error_occurred
def test_single_result_check_error(self, capsys):
writer = output.NagiosOutputWriter()
output.error_occurred = False
# one server with one error
writer.result_check('a', 'test', False, None)
writer.close()
(out, err) = capsys.readouterr()
assert out == 'BARMAN CRITICAL - server a has issues * ' \
'a FAILED: test\na.test: FAILED\n'
assert err == ''
assert output.error_occurred
assert output.error_exit_code == 2
def test_result_check_ok_hint_color(self, capsys, monkeypatch):
monkeypatch.setattr(output, 'ansi_colors_enabled', True)
writer = output.ConsoleOutputWriter()
output.error_occurred = False
server = 'test'
check = 'test check'
hint = 'do something'
writer.result_check(server, check, True, hint)
(out, err) = capsys.readouterr()
assert out == '\t%s: %sOK%s (%s)\n' % (check, GREEN, RESET, hint)
assert err == ''
assert not output.error_occurred
def teardown_module(module):
"""
Set the output API to a functional state, after testing it
"""
output.set_output_writer(output.DEFAULT_WRITER)
switch_wal,
switch_xlog,
sync_info,
sync_backup,
sync_wals,
]
)
# noinspection PyBroadException
try:
p.dispatch(pre_call=global_config)
except KeyboardInterrupt:
msg = "Process interrupted by user (KeyboardInterrupt)"
output.error(msg)
except Exception as e:
msg = "%s\nSee log file for more details." % e
output.exception(msg)
# cleanup output API and exit honoring output.error_occurred and
# output.error_exit_code
output.close_and_exit()
target_name,
target_time,
target_tli,
target_xid,
target_lsn,
target_immediate,
target_action)
# Retrieve the safe_horizon for smart copy
self._retrieve_safe_horizon(recovery_info, backup_info, dest)
# check destination directory. If doesn't exist create it
try:
recovery_info['cmd'].create_dir_if_not_exists(dest)
except FsOperationFailed as e:
output.error("unable to initialise destination directory "
"'%s': %s", dest, e)
output.close_and_exit()
# Initialize tablespace directories
if backup_info.tablespaces:
self._prepare_tablespaces(backup_info,
recovery_info['cmd'],
dest,
tablespaces)
# Copy the base backup
output.info("Copying the base backup.")
try:
self._backup_copy(
backup_info, dest,
tablespaces, remote_command,
recovery_info['safe_horizon'])
# Take care of the backup lock.
# Only one process can modify a backup a a time
with ServerBackupIdLock(self.config.barman_lock_directory,
self.config.name,
backup_info.backup_id):
orig_status = backup_info.status
self.backup_manager.check_backup(backup_info)
if orig_status == backup_info.status:
output.debug(
"Check finished: the status of backup %s of server %s "
"remains %s",
backup_info.backup_id,
self.config.name,
backup_info.status)
else:
output.debug(
"Check finished: the status of backup %s of server %s "
"changed from %s to %s",
backup_info.backup_id,
self.config.name,
orig_status,
backup_info.status)
except LockFileBusy:
# If another process is holding the backup lock,
# warn the user and terminate
output.error(
"Another process is holding the lock for "
"backup %s of server %s." % (
backup_info.backup_id, self.config.name))
return
except LockFilePermissionDenied as e:
def handler(line):
if line:
if prefix:
output.info("%s%s", prefix, line)
else:
output.info("%s", line)
self.config.name,
orig_status,
backup_info.status)
except LockFileBusy:
# If another process is holding the backup lock,
# warn the user and terminate
output.error(
"Another process is holding the lock for "
"backup %s of server %s." % (
backup_info.backup_id, self.config.name))
return
except LockFilePermissionDenied as e:
# We cannot access the lockfile.
# warn the user and terminate
output.error("Permission denied, unable to access '%s'" % e)
return