Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multiple_key_files_failure(self):
"""
Expect failure when multiple keys in play and none are accepted
"""
# Until #387 is fixed we have to catch a high-up exception since
# various platforms trigger different errors here >_<
self.assertRaises(SSHException,
self._test_connection,
key_filename=[test_path('test_rsa.key')],
allowed_keys=['ecdsa-sha2-nistp256'],
)
self.d = None
self.p = None
self.q = None
if file_obj is not None:
self._from_private_key(file_obj, password)
return
if filename is not None:
self._from_private_key_file(filename, password)
return
if (msg is None) and (data is not None):
msg = Message(data)
if vals is not None:
self.e, self.n = vals
else:
if msg is None:
raise SSHException('Key object may not be empty')
if msg.get_text() != 'ssh-rsa':
raise SSHException('Invalid key')
self.e = msg.get_mpint()
self.n = msg.get_mpint()
self.size = util.bit_length(self.n)
if len(id_text)==0:
raise paramiko.ssh_exception.SSHException("Can not determinte release neither with 'lsb_release' nor with 'cat /etc/redhat-release'")
id_ = id_text.rstrip('\n')
command = 'uname -o'
(_, stdout, stderr) = ssh_conn.exec_command(command)
error = stderr.read()
if len(error)>0:
raise paramiko.ssh_exception.SSHException(command +' : '+ error)
type_ = stdout.read().rstrip('\n')
command = 'uname -i'
(_, stdout, stderr) = ssh_conn.exec_command(command)
error = stderr.read()
if len(error)>0:
raise paramiko.ssh_exception.SSHException(command +' : '+ error)
bit_architecture = stdout.read().rstrip('\n')
(return_status, code) = os.set(id_, type_, bit_architecture)
if not return_status:
return (return_status, code)
warning_text += code
return (True, warning_text)
raise SSHException(err)
if pkey_file_obj is not None:
self._from_private_key(pkey_file_obj, password)
elif pkey_filename is not None:
self._from_private_key_file(pkey_filename, password)
# TODO: utilize key= kwarg, set to self.key as in RSAKey
# Normalize to a Message, since certificate files are by definition
# stored in message format, even on-disk.
if msg is None and data is not None:
msg = Message(data)
if msg is None:
# TODO: better exception, unless RSAKey does exactly this too
raise SSHException('Key object may not be empty')
if msg.get_text() != 'ssh-rsa-cert-v01@openssh.com':
# TODO: ditto
raise SSHException('Invalid key')
# From here, we are simply following the RFC's defined message format
self.nonce = msg.get_string()
e = msg.get_mpint()
n = msg.get_mpint()
# TODO: bail out if self.key exists & its public numbers != ours!
# Key might've been set by a private key file. If not, set it from the
# cert
if self.key is None:
self.key = rsa.RSAPublicNumbers(e=e, n=n).public_key(
default_backend())
if not self._update_available:
if job.try_transition(JobState.SUBMITTED,
JobState.STAGING_IN):
self._stage_and_start_job(job_id, job)
self._logger.debug('Staged and started job')
if JobState.cancellation_active(job.state):
self._cancel_job(job_id, job)
self._logger.debug('State is now ' + job.state.value)
if job.please_delete and JobState.is_final(job.state):
self._delete_job(job_id, job)
except (ConnectionError, IOError, OSError, SSHException) as e:
self._logger.debug('System exception while processing job:'
' {}'.format(e))
if isinstance(e, IOError) or isinstance(e, OSError):
if ('Socket' not in str(e) and
'Network' not in str(e) and
'Temporary' not in str(e)):
job.error('An IO error occurred while uploading the job'
' input data: {}. Please check that your network'
' connection works, and that you have enough'
' disk space or quota on the remote machine.'
''.format(e))
job.state = JobState.SYSTEM_ERROR
self._logger.critical('An internal error occurred when'
' processing job ' + job.id)
self._logger.critical(traceback.format_exc())
return False
self.host_key_type = agreed_keys[0]
if self.server_mode and (self.get_server_key() is None):
raise SSHException('Incompatible ssh peer (can\'t match requested host key type)')
if self.server_mode:
agreed_local_ciphers = list(filter(self._preferred_ciphers.__contains__,
server_encrypt_algo_list))
agreed_remote_ciphers = list(filter(self._preferred_ciphers.__contains__,
client_encrypt_algo_list))
else:
agreed_local_ciphers = list(filter(client_encrypt_algo_list.__contains__,
self._preferred_ciphers))
agreed_remote_ciphers = list(filter(server_encrypt_algo_list.__contains__,
self._preferred_ciphers))
if (len(agreed_local_ciphers) == 0) or (len(agreed_remote_ciphers) == 0):
raise SSHException('Incompatible ssh server (no acceptable ciphers)')
self.local_cipher = agreed_local_ciphers[0]
self.remote_cipher = agreed_remote_ciphers[0]
self._log_agreement(
'Cipher', local=self.local_cipher, remote=self.remote_cipher
)
if self.server_mode:
agreed_remote_macs = list(filter(self._preferred_macs.__contains__, client_mac_algo_list))
agreed_local_macs = list(filter(self._preferred_macs.__contains__, server_mac_algo_list))
else:
agreed_local_macs = list(filter(client_mac_algo_list.__contains__, self._preferred_macs))
agreed_remote_macs = list(filter(server_mac_algo_list.__contains__, self._preferred_macs))
if (len(agreed_local_macs) == 0) or (len(agreed_remote_macs) == 0):
raise SSHException('Incompatible ssh server (no acceptable macs)')
self.local_mac = agreed_local_macs[0]
self.remote_mac = agreed_remote_macs[0]
def handler(title, instructions, fields):
if len(fields) > 1:
raise SSHException('Fallback authentication failed.')
if len(fields) == 0:
# for some reason, at least on os x, a 2nd request will
# be made with zero fields requested. maybe it's just
# to try to fake out automated scripting of the exact
# type we're doing here. *shrug* :)
return []
return [ password ]
return self.auth_interactive(username, handler)
def __init__(self, source, target):
"""Create a BackupMusic object."""
self.ask_stored_questions = True
self.source = source
self.target_prefix = source.replace('/', '_').strip('_')
self.target = target
self.username, self.server, self.path = parse_ssh_uri(target)
self.ssh = paramiko.SSHClient()
self.ssh.load_host_keys(os.path.expanduser(os.path.join("~", ".ssh",
"known_hosts")))
self.priorityPatterns = []
try:
self.ssh.connect(self.server, username=self.username)
except paramiko.ssh_exception.SSHException as e:
print(f'Error connecting to {self.server}: {e}')
return
self.sftp = self.ssh.open_sftp()
self.sftp.remote_hostname = self.server
class ChannelException(SSHException):
"""
Exception raised when an attempt to open a new `.Channel` fails.
:param int code: the error code returned by the server
.. versionadded:: 1.6
"""
def __init__(self, code, text):
SSHException.__init__(self, text)
self.code = code
# for unpickling
self.args = (code, text, )
class BadHostKeyException(SSHException):
"""
The host key given by the SSH server did not match what we were expecting.
:param str hostname: the hostname of the SSH server
:param PKey got_key: the host key presented by the server
:param PKey expected_key: the host key expected
.. versionadded:: 1.6
"""
def __init__(self, hostname, got_key, expected_key):
message = 'Host key for server {0} does not match: got {1}, expected {2}' # noqa
message = message.format(
hostname, got_key.get_base64(),
expected_key.get_base64())
SSHException.__init__(self, message)
self.hostname = hostname
def _decode_key(self, data):
# private key file contains:
# RSAPrivateKey = { version = 0, n, e, d, p, q, d mod p-1, d mod q-1, q**-1 mod p }
try:
keylist = BER(data).decode()
except BERException:
raise SSHException('Unable to parse key file')
if (type(keylist) is not list) or (len(keylist) < 4) or (keylist[0] != 0):
raise SSHException('Not a valid RSA private key file (bad ber encoding)')
self.n = keylist[1]
self.e = keylist[2]
self.d = keylist[3]
# not really needed
self.p = keylist[4]
self.q = keylist[5]
self.size = util.bit_length(self.n)