Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def sshConfig(self, name):
p = Popen(['vagrant', 'ssh-config', name],
stdout=PIPE,
stderr=None,
stdin=None,
cwd='tests/',
)
p.wait()
if p.returncode != 0:
raise RuntimeError('Could not get ssh-config for ' + repr(name))
ssh_config = paramiko.SSHConfig()
ssh_config.parse(p.stdout)
p.stdout.close()
return ssh_config.lookup(name)
def _test_connection(self, **kwargs):
"""
(Most) kwargs get passed directly into SSHClient.connect().
The exception is ... no exception yet
"""
host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
public_host_key = paramiko.RSAKey(data=host_key.asbytes())
self.tc = paramiko.SSHClient()
self.tc.set_missing_host_key_policy(paramiko.WarningPolicy())
self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port),
'ssh-rsa', public_host_key)
self.tc.connect(hostname=self.addr, port=self.port, username=self.username, gss_host=self.hostname,
gss_auth=True, **kwargs)
self.event.wait(1.0)
self.assert_(self.event.is_set())
self.assert_(self.ts.is_active())
self.assertEquals(self.username, self.ts.get_username())
self.assertEquals(True, self.ts.is_authenticated())
stdin, stdout, stderr = self.tc.exec_command('yes')
schan = self.ts.accept(1.0)
schan.send('Hello there.\n')
schan.send_stderr('This is on stderr.\n')
def write(self, bucket, key, body):
path = posixpath.join(self._dir, key)
ssh = paramiko.SSHClient()
ssh.load_system_host_keys()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect("127.0.0.1")
sftp = ssh.open_sftp()
try:
sftp.stat(path)
sftp.remove(path)
except IOError:
pass
stdin, stdout, stderr = ssh.exec_command(
"mkdir -p $(dirname {})".format(path)
)
self.assertEqual(stdout.channel.recv_exit_status(), 0)
def __init__(self, host, sg_platform="centos", username=None, password=None):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client = client
self.host = host
self.sg_platform = sg_platform
if "[" in self.host:
self.host = self.host.replace("[", "")
self.host = self.host.replace("]", "")
self.username = ansible.constants.DEFAULT_REMOTE_USER
if username is not None:
self.username = username
self.password = password
def test_multiple_key_files_failure(self):
"""
Expect failure when multiple keys in play and none are accepted
"""
# Until #387 is fixed we have to catch a high-up exception since
# various platforms trigger different errors here >_<
self.assertRaises(SSHException,
self._test_connection,
key_filename=[test_path('test_rsa.key')],
allowed_keys=['ecdsa-sha2-nistp256'],
)
def __init__(self, hostname, port, username, private_ssh_key_filename):
self.hostname = hostname
self.port = port
self.username = username
#print "Created TestingNode with hostname %s, port %i, username %s" % (hostname, port, username)
# read private key from file to get access to the node
if True: # Always use RSA for now
self.private_ssh_key = paramiko.RSAKey(filename=private_ssh_key_filename)
else:
self.private_ssh_key = paramiko.DSSKey(filename=private_ssh_key_filename)
self.global_lock_file = "/tmp/cloudtest_lock"
system_random = random.SystemRandom()
self.global_build_path = "/tmp/cloudtest_build_" + str(system_random.randint(10000000, 99999999));
self.global_bench_path = "/tmp/cloudtest_bench_" + str(system_random.randint(10000000, 99999999));
self.global_test_path = "/tmp/cloudtest_test_" + str(system_random.randint(10000000, 99999999));
#print "Installing build into %s\n" % self.global_build_path
self.basedata_installed = False
self.ssh_transport = None
timeout_seconds=60 * 3,
sleep_seconds=10,
waiting_for="any alive DHCP agent for instance network",
log=False)
else:
proxy_nodes = [proxy_node]
for node in proxy_nodes:
for pkey in env.admin_ssh_keys:
ip = env.find_node_by_fqdn(node).data['ip']
proxy = NetNsProxy(ip=ip, pkey=pkey, ns=dhcp_namespace,
proxy_to_ip=vm_ip)
proxies.append(proxy)
instance_keys = []
if vm_keypair is not None:
instance_keys.append(paramiko.RSAKey.from_private_key(six.StringIO(
vm_keypair.private_key)))
return SSHClient(vm_ip,
port=22,
username=username,
password=password,
private_keys=instance_keys,
proxies=proxies)
def test_login_false_username_ssh(self, credentials):
"""Verify AuthenticationException in case Incorrect username for ssh object.
"""
ssh_conn = clissh.CLISSH(credentials[0])
with pytest.raises(paramiko.AuthenticationException):
ssh_conn = clissh.CLISSH(credentials[0])
ssh_conn.login(ssh_conn.randstr(30), credentials[2], timeout=5)
def test_utils_ssh_connect_exception(mock_connect, mock_sleep, mock_time):
"""Test exception raised connecting to ssh."""
mock_connect.side_effect = paramiko.ssh_exception.SSHException('ERROR!')
mock_sleep.return_value = None
mock_time.side_effect = [0, 0, 2]
with pytest.raises(IpaSSHException) as error:
ipa_utils.get_ssh_client(
LOCALHOST,
'tests/data/ida_test',
timeout=1
)
assert str(error.value) == 'Attempt to establish SSH connection failed.'
assert mock_connect.call_count > 0
def __init__(self, *args, **kwargs):
wat = "You're giving ssh_config explicitly, please use Config_!"
assert "ssh_config" not in kwargs, wat
# Give ssh_config explicitly -> shorter way of turning off loading
kwargs["ssh_config"] = SSHConfig()
super(Config, self).__init__(*args, **kwargs)