Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
# Generate an RSA key and add it to the server
self.rsa_key = rsa_key = paramiko.RSAKey.generate(bits=512)
self.addKeyToServer(rsa_key)
# Write the key to a file
key_fd, self.key_file = tempfile.mkstemp()
os.close(key_fd)
rsa_key.write_private_key_file(self.key_file)
#with open(key_fd, 'w+') as key_handle:
# Create an empty config file
_, self.config_file = tempfile.mkstemp()
def test__get_pkey_rsa(self):
private_rsa_key = six.StringIO()
private_rsa_key_obj = paramiko.RSAKey.generate(1024)
private_rsa_key_obj.write_private_key(private_rsa_key)
private_rsa_key.seek(0)
ssh = sshutils.SSH("root", "example.net")
self.assertIsInstance(ssh._get_pkey(private_rsa_key),
paramiko.RSAKey)
private_rsa_key.seek(0)
self.assertIsInstance(ssh._get_pkey(private_rsa_key.getvalue()),
paramiko.RSAKey)
def generate_keys():
"""
Generatees a public and private key
"""
key = paramiko.RSAKey.generate(2048)
privateString = StringIO()
key.write_private_key(privateString)
return key.get_base64(), privateString.getvalue()
def create_ssh_keypair(comment=None, bits=2048):
"""Generate an ssh keypair for use on the overcloud"""
if comment is None:
comment = "Generated by TripleO"
key = paramiko.RSAKey.generate(bits)
keyout = six.StringIO()
key.write_private_key(keyout)
private_key = keyout.getvalue()
public_key = '{} {} {}'.format(key.get_name(), key.get_base64(), comment)
return {
'private_key': private_key,
'public_key': public_key,
}
def generate_rsa_key():
return paramiko.RSAKey.generate(2048)
def generate_user_key(self, bits=2048):
"""Generates a new RSA keypair for the user running Review Board.
This will store the new key in the backend storage and return the
resulting key as an instance of :py:mod:`paramiko.RSAKey`.
If a key already exists, it's returned instead.
Callers are expected to handle any exceptions. This may raise
IOError for any problems in writing the key file, or
paramiko.SSHException for any other problems.
"""
key = self.get_user_key()
if not key:
key = paramiko.RSAKey.generate(bits)
self._write_user_key(key)
return key
def generate_ssh_keys(self, password=None):
"""
Set the C{ssh_public_key} and C{ssh_private_key} attributes to be
new keys. Note that the keys are stored in base64.
@parameter password: password to use to encrypt the private key
@type password: string
"""
from paramiko import RSAKey
from StringIO import StringIO
key = RSAKey.generate(SSH_KEY_SIZE)
output = StringIO()
key.write_private_key(output, password=password)
self.ssh_private_key = output.getvalue()
output.close()
self.ssh_public_key = \
"ssh-rsa %s auto-generated Expedient key" % (key.get_base64())
def generate_host_key(self, filename):
""" Generate server host key to local filepath ``filename``. """
from paramiko import RSAKey
bits = 4096
if self.config.has_option('ssh', 'HostKeyBits'):
bits = self.config.getint('ssh', 'HostKeyBits')
# generate private key and save,
self.log.info('Generating {bits}-bit RSA public/private keypair.'
.format(bits=bits))
priv_key = RSAKey.generate(bits=bits)
priv_key.write_private_key_file(filename, password=None)
self.log.debug('{filename} saved.'.format(filename=filename))
# save public key,
pub = RSAKey(filename=filename, password=None)
with open('{0}.pub'.format(filename,), 'w') as fp:
fp.write("{0} {1}".format(pub.get_name(), pub.get_base64()))
self.log.debug('{filename}.pub saved.'.format(filename=filename))
return priv_key
def generate_key(host: str, key_type: KeyType = KeyType.RSA) -> str:
"""Generates a new private-public key pair and returns the path
to the private key.
Private key is saved in a directory determined by
:func:`get_free_private_key_location`.
:param host: Host name for identification purposes.
:param key_type: Key type to generate.
"""
if key_type != KeyType.RSA:
raise NotImplementedError("Only RSA keys are supported for now.")
key = RSAKey.generate(bits=RSA_BITS)
private_key_location = get_free_private_key_location(key_type=key_type)
key.write_private_key_file(filename=private_key_location)
public_key = RSAKey(filename=private_key_location)
now_down_to_minutes = datetime.datetime.now().isoformat()[:16]
comment = "idact/{host}/{now_down_to_minutes}".format(
host=host,
now_down_to_minutes=now_down_to_minutes)
public_key_value = "{name} {base64} {comment}".format(
name=public_key.get_name(),
base64=public_key.get_base64(),
comment=comment)
sock = socket.socket()
sock.connect((args.hostname, args.port))
# instantiate transport
transport = paramiko.transport.Transport(sock)
try:
transport.start_client()
except paramiko.ssh_exception.SSHException:
# server was likely flooded, retry up to 3 times
transport.close()
if tried < 4:
tried += 1
return checkUsername(username, tried)
else:
print('[-] Failed to negotiate SSH transport')
try:
transport.auth_publickey(username, paramiko.RSAKey.generate(1024))
except BadUsername:
return (username, False)
except paramiko.ssh_exception.AuthenticationException:
return (username, True)
#Successful auth(?)
raise Exception("There was an error. Is this the correct version of OpenSSH?")