Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
except Exception, e:
print '*** Unable to open host keys file (%s)' % filename
return
for line in f:
keylist = line.split(' ')
if len(keylist) != 3:
continue
hostlist, keytype, key = keylist
hosts = hostlist.split(',')
for host in hosts:
if not keys.has_key(host):
keys[host] = {}
if keytype == 'ssh-rsa':
keys[host][keytype] = paramiko.RSAKey(data=base64.decodestring(key))
elif keytype == 'ssh-dss':
keys[host][keytype] = paramiko.DSSKey(data=base64.decodestring(key))
f.close()
return keys
def get_authorized_keys(path):
keys = []
with open(path) as fp:
for line in fp:
flds = line.split(' ')
if len(flds) < 2: continue
if flds[0] == 'ssh-rsa':
f = paramiko.RSAKey
elif flds[0] == 'ssh-dss':
f = paramiko.DSSKey
elif flds[0].startswith('ecdsa-'):
f = paramiko.ECDSAKey
else:
continue
data = decodebytes(flds[1].encode('ascii'))
keys.append(f(data=data))
return keys
def get_dss_key_from_string(dsskeystr=None, password=None):
if hasattr(paramiko.DSSKey, 'write_private_key'):
fd = StringIO.StringIO()
fd.write(dsskeystr)
fd.seek(0L)
return paramiko.DSSKey(file_obj=fd, password=password)
# if paramiko <= 1.6
else:
return _get_dss_key_from_string(dsskeystr, password)
def ssh_pubkey_gen(private_key=None, username='jumpserver', hostname='localhost', password=None):
if isinstance(private_key, bytes):
private_key = private_key.decode("utf-8")
if isinstance(private_key, string_types):
private_key = ssh_key_string_to_obj(private_key, password=password)
if not isinstance(private_key, (paramiko.RSAKey, paramiko.DSSKey)):
raise IOError('Invalid private key')
public_key = "%(key_type)s %(key_content)s %(username)s@%(hostname)s" % {
'key_type': private_key.get_name(),
'key_content': private_key.get_base64(),
'username': username,
'hostname': hostname,
}
return public_key
def ssh_key_string_to_obj(text, password=None):
key = None
try:
key = paramiko.RSAKey.from_private_key(StringIO(text), password=password)
except paramiko.SSHException:
pass
try:
key = paramiko.DSSKey.from_private_key(StringIO(text), password=password)
except paramiko.SSHException:
pass
return key
try:
ssh_client = paramiko.SSHClient()
# Always auto-add remote SSH host keys.
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_client.load_system_host_keys()
# Connect using paramiko with a timeout parameter (requires paramiko 1.7)
if ssh_keys:
pkeys = []
for key in ssh_keys:
logging.debug('Using SSH private key for device authentication.')
# Use a virtual temporary file to store the key.
ssh_key_fileobj = cStringIO.StringIO()
ssh_key_fileobj.write(key)
ssh_key_fileobj.reset()
try:
pkeys.append(paramiko.DSSKey(file_obj=ssh_key_fileobj))
logging.debug('Using SSH DSA key for %r', hostname)
except (IndexError, paramiko.SSHException) as e:
if (isinstance(e, IndexError) or
'not a valid DSA private key file' in str(e)):
ssh_key_fileobj.reset()
try:
logging.debug('Using SSH RSA key for %r', hostname)
pkeys.append(paramiko.RSAKey(file_obj=ssh_key_fileobj))
except (IndexError, paramiko.SSHException) as e:
raise exceptions.AuthenticationError(str(e))
else:
raise exceptions.ConnectError('SSHException: %s' % str(e))
else:
logging.debug('Using password for %r', hostname)
pkeys = [None]
for pkey in pkeys:
access to the server will know the host is legitimate.
"""
raise NotImplementedError
def replace_host_key(self, hostname, old_key, new_key):
"""Replaces a host key in the known hosts list with another.
This is used for replacing host keys that have changed.
"""
raise NotImplementedError
class FileSSHStorage(SSHStorage):
DEFAULT_KEY_FILES = (
(paramiko.RSAKey, 'id_rsa'),
(paramiko.DSSKey, 'id_dsa'),
)
SSH_DIRS = ('.ssh', 'ssh')
_ssh_dir = None
def get_user_key_info(self):
for cls, filename in self.DEFAULT_KEY_FILES:
# Paramiko looks in ~/.ssh and ~/ssh, depending on the platform,
# so check both.
for sshdir in self.SSH_DIRS:
path = os.path.join(self.get_ssh_dir(sshdir), filename)
if os.path.isfile(path):
return cls, path
def compute_fingerprint(self):
data = base64.b64decode(self.key)
if self.key_type == "ssh-rsa":
pkey = RSAKey(data=data)
elif self.key_type == "ssh-dss":
pkey = DSSKey(data=data)
return ":".join(re.findall(r"..", hexlify(pkey.get_fingerprint())))
def _check(self):
fake = StringIO(self.password)
if "RSA PRIVATE KEY" in self.password:
key = paramiko.RSAKey.from_private_key(fake)
elif "DSA PRIVATE KEY" in self.password:
key = paramiko.DSSKey.from_private_key(fake)
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.MissingHostKeyPolicy()) # ignore unknown hosts
c.connect(hostname=self.target.host, port=self.target.port, username=self.username, pkey=key)
stdin, stdout, stderr = c.exec_command('uname -a')
evidence = stdout.readlines()[0]
c.close()
self.password = 'Private Key'
return evidence
# else, try to load from file
keyfile = self.public_key
if keyfile.endswith('.pub'):
keyfile = keyfile[:-4]
else:
gc3libs.log.warning(
"Option `public_key` in configuration file should contain"
" the path to a public key file (with `.pub` ending),"
" but `%s` was found instead. Continuing anyway.",
self.public_key)
privkey = None
local_fingerprints = []
for format, privkey_reader, pubkey_reader in [
('DSS', paramiko.DSSKey.from_private_key_file,
Crypto.PublicKey.DSA.importKey),
('RSA', paramiko.RSAKey.from_private_key_file,
Crypto.PublicKey.RSA.importKey),
]:
try:
gc3libs.log.debug(
"Trying to load key file `%s` as SSH %s key...",
keyfile, format)
privkey = privkey_reader(keyfile)
gc3libs.log.info(
"Successfully loaded key file `%s` as SSH %s key.",
keyfile, format)
# compute public key fingerprints, for comparing them
# with the remote one
localkey_fingerprints = [
# Usual SSH key fingerprint, computed like