Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_login_false_username_ssh(self, credentials):
"""Verify AuthenticationException in case Incorrect username for ssh object.
"""
ssh_conn = clissh.CLISSH(credentials[0])
with pytest.raises(paramiko.AuthenticationException):
ssh_conn = clissh.CLISSH(credentials[0])
ssh_conn.login(ssh_conn.randstr(30), credentials[2], timeout=5)
:type command: str
:param command: command to be run on remote host
:type callback: function
:param callback: function to call when execution completes
"""
try:
logger.debug(('{0}: execute async "{1}"'
'with callback {2}'.format(self.target_address,
command,
callback)))
future = self.executor.submit(self.execute, command)
if callback is not None:
future.add_done_callback(callback)
return future
except (AuthenticationException, SSHException,
ChannelException, SocketError) as ex:
logger.critical(("{0} execution failed on {1} with exception:"
"{2}".format(command, self.target_address,
ex)))
raise SSHCommandError(self.target_address, command, ex)
exit(1)
try:
para.connect(username=user)
except EOFError,e:
exit(1)
except paramiko.SSHException,e:
exit(1)
passwd = 'A'*length
try:
para.auth_password(user,passwd)
except paramiko.AuthenticationException,e:
print
except paramiko.SSHException,e:
print
para.close()
sock.close()
port=request.values.get('port')
username=request.values.get('username')
pwd=request.values.get('pwd')
#创建ssh链接
sshclient = paramiko.SSHClient()
sshclient.load_system_host_keys()
sshclient.set_missing_host_key_policy(paramiko.AutoAddPolicy()) #不限制白名单以外的连接
try:
sshclient.connect(host, port, username, pwd)
chan = sshclient.invoke_shell(term='xterm') #创建交互终端
chan.settimeout(0)
ids = str(int(time.time()+random.randint(1,999999999)))
sshListDict[ids] = chan
except paramiko.BadAuthenticationType:
return json.dumps({'resultCode':1,'result':'登录失败,错误的连接类型'})
except paramiko.AuthenticationException:
return json.dumps({'resultCode':1,'result':'登录失败'})
except paramiko.BadHostKeyException:
return json.dumps({'resultCode':1,'result':'登录失败,请检查IP'})
except:
return json.dumps({'resultCode':1,'result':'登录失败'})
else:
sshTimeout[ids]=time.time()
return json.dumps({'resultCode':0,'ids':ids})
box_user = out.stdout.read().rstrip('\n')
out = subprocess.Popen("echo $" + pwd, shell=True,
stdout=subprocess.PIPE)
box_pwd = out.stdout.read().rstrip('\n')
try:
RemoteKey = getRemoteKey(key_gen_cmd, key_rsa_path, box_ip,
box_user, box_pwd)
appendLocalKeyInRemote(LocalKey, key_append_path, box_ip,
box_user, box_pwd)
appendRemoteKeyInLocal(RemoteKey, key_append_path, box_ip)
logging.info("Passwordless SSH has been setup b/w \
localhost & %s", box_ip)
except (paramiko.SSHException, paramiko.BadHostKeyException,
paramiko.AuthenticationException, socket.error) as e:
logging.info("Passwordless SSH setup failed b/w localhost & %s \
with %s, please verify host connectivity", box_ip, e)
@retry(Exception, (AuthenticationException, paramiko.AuthenticationException),
tries=TRIES, delay=DELAY, backoff=BACKOFF)
def sftp_get(self, src, dest):
return SSH.sftp_get(self, src, dest)
# and automatically add new host keys for hosts we haven't seen before.
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self._ssh.connect(self._host, **self._params)
except paramiko.AuthenticationException as e:
if self._interactive and 'password' not in self._params:
# If authentication has failed, and we haven't already tried
# username/password, and configuration allows it, then try
# again with username/password.
if 'username' not in self._params:
self._params['username'] = getpass.getuser()
self._params['password'] = getpass.getpass()
self._connect()
else:
raise paramiko.AuthenticationException(e)
except Exception as e:
print(e)
if not hasattr(self, '_sftp'):
self._sftp = self._ssh.open_sftp()
def get_conversation(self):
'''
create a ssh conversation
'''
conversation = paramiko.SSHClient()
ssh_policy = paramiko.WarningPolicy()
conversation.set_missing_host_key_policy(ssh_policy)
try:
conversation.connect(self.domain,
port=self.port,
username=self.username,
password=self.password)
except socket.gaierror:
raise SocksRemoteException('链接代理失败')
except paramiko.AuthenticationException:
raise SocksRemoteException('用户认证失败')
except paramiko.BadHostKeyException:
raise SocksRemoteException('Host Key 验证失败')
except socket.timeout:
return self.get_conversation()
return conversation
def ssh_auth(self, ip, user, password) -> dict:
self.ssh.load_system_host_keys()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self.ssh.connect(ip, port=22, username=user, password=password, timeout=5, auth_timeout=5, banner_timeout=5)
print("[+] Login {}/{} {}success{}".format(user, password, Colors.GREEN, Colors.END))
stdin, stdout, stderr = self.ssh.exec_command("ifconfig")
for line in stdout.readlines():
print(line.strip())
self.ssh.close()
return {"User": user, "Pass": password, "Login": True}
except paramiko.AuthenticationException:
self.ssh.close()
print("[-] Login {}/{} {}incorrect{}".format(user, password, Colors.RED, Colors.END))
return {"User": user, "Pass": password, "Login": False}
except socket.error:
print("[*] Connection could not be established to {}".format(ip))
return {"User": user, "Pass": password, "Login": False}
self.ssh.connect(ssh_job.ip, port=int(port),
username=auth.username,
password=auth.password,
pkey=pkey,
allow_agent=ssh_job.allow_agent,
look_for_keys=ssh_job.look_for_keys,
timeout=ssh_job.timeout)
ssh_job.port = port
ssh_job.auth = auth
found_port = port
found_auth = True
log.info("success: %s" % debug_str)
break
# Implies we've found an SSH server listening:
except paramiko.AuthenticationException, e:
# Because we stop checking ports once we find one where ssh
# is listening, we can report the error message here and it
# will end up in the final report correctly:
err = _("login failed")
log.error(err)
ssh_job.error = err
found_port = port
continue
# No route to host:
except socket.error, e:
log.warn("No route to host, skipping port: %s" % debug_str)
ssh_job.error = str(e)
break
# TODO: Hitting a live port that isn't ssh will result in