Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# indefinitely, creating a GC cycle and not letting Transport ever be
# GC'd. it's a bug in Thread.)
# Hold reference to 'sys' so we can test sys.modules to detect
# interpreter shutdown.
self.sys = sys
# active=True occurs before the thread is launched, to avoid a race
_active_threads.append(self)
if self.server_mode:
self._log(DEBUG, 'starting thread (server mode): %s' % hex(long(id(self)) & xffffffff))
else:
self._log(DEBUG, 'starting thread (client mode): %s' % hex(long(id(self)) & xffffffff))
try:
try:
self.packetizer.write_all(b(self.local_version + '\r\n'))
self._log(DEBUG, 'Local version/idstring: %s' % self.local_version)
self._check_banner()
# The above is actually very much part of the handshake, but
# sometimes the banner can be read but the machine is not
# responding, for example when the remote ssh daemon is loaded
# in to memory but we can not read from the disk/spawn a new
# shell.
# Make sure we can specify a timeout for the initial handshake.
# Re-use the banner timeout for now.
self.packetizer.start_handshake(self.handshake_timeout)
self._send_kex_init()
self._expect_packet(MSG_KEXINIT)
while self.active:
if self.packetizer.need_rekey() and not self.in_kex:
self._send_kex_init()
"""
Return a "hashed" form of the hostname, as used by OpenSSH when storing
hashed hostnames in the known_hosts file.
:param str hostname: the hostname to hash
:param str salt: optional salt to use when hashing (must be 20 bytes long)
:return: the hashed hostname as a `str`
"""
if salt is None:
salt = os.urandom(sha1().digest_size)
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
salt = decodebytes(b(salt))
assert len(salt) == sha1().digest_size
hmac = HMAC(salt, b(hostname), sha1).digest()
hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
"""
Return a "hashed" form of the hostname, as used by OpenSSH when storing
hashed hostnames in the known_hosts file.
:param str hostname: the hostname to hash
:param str salt: optional salt to use when hashing (must be 20 bytes long)
:return: the hashed hostname as a `str`
"""
if salt is None:
salt = os.urandom(sha1().digest_size)
else:
if salt.startswith('|1|'):
salt = salt.split('|')[2]
salt = decodebytes(b(salt))
assert len(salt) == sha1().digest_size
hmac = HMAC(salt, b(hostname), sha1).digest()
hostkey = '|1|%s|%s' % (u(encodebytes(salt)), u(encodebytes(hmac)))
return hostkey.replace('\n', '')
def asbytes(s):
if not isinstance(s, bytes_types):
if isinstance(s, string_types):
s = b(s)
else:
try:
s = s.asbytes()
except Exception:
raise Exception('Unknown type')
return s
def safe_string(s):
out = b('')
for c in s:
i = byte_ord(c)
if 32 <= i <= 127:
out += byte_chr(i)
else:
out += b('%%%02X' % i)
return out
def _compute_key(self, id, nbytes):
"""id is 'A' - 'F' for the various keys used by ssh"""
m = Message()
m.add_mpint(self.K)
m.add_bytes(self.H)
m.add_byte(b(id))
m.add_bytes(self.session_id)
# Fallback to SHA1 for kex engines that fail to specify a hex
# algorithm, or for e.g. transport tests that don't run kexinit.
hash_algo = getattr(self.kex_engine, 'hash_algo', None)
hash_select_msg = "kex engine {} specified hash_algo {!r}".format(
self.kex_engine.__class__.__name__, hash_algo,
)
if hash_algo is None:
hash_algo = sha1
hash_select_msg += ", falling back to sha1"
if not hasattr(self, '_logged_hash_selection'):
self._log(DEBUG, hash_select_msg)
setattr(self, '_logged_hash_selection', True)
out = sofar = hash_algo(m.asbytes()).digest()
while len(out) < nbytes:
m = Message()
def _send_handle_response(self, request_number, handle, folder=False):
if not issubclass(type(handle), SFTPHandle):
# must be error code
self._send_status(request_number, handle)
return
handle._set_name(b('hx%d' % self.next_handle))
self.next_handle += 1
if folder:
self.folder_table[handle._get_name()] = handle
else:
self.file_table[handle._get_name()] = handle
self._response(request_number, CMD_HANDLE, handle._get_name())
def safe_string(s):
out = b('')
for c in s:
i = byte_ord(c)
if 32 <= i <= 127:
out += byte_chr(i)
else:
out += b('%%%02X' % i)
return out
raise SSHException("Invalid key")
public_keys = []
for _ in range(num_keys):
pubkey = Message(message.get_binary())
if pubkey.get_text() != "ssh-ed25519":
raise SSHException("Invalid key")
public_keys.append(pubkey.get_binary())
private_ciphertext = message.get_binary()
if ciphername == "none":
private_data = private_ciphertext
else:
cipher = Transport._cipher_info[ciphername]
key = bcrypt.kdf(
password=b(password),
salt=bcrypt_salt,
desired_key_bytes=cipher["key-size"] + cipher["block-size"],
rounds=bcrypt_rounds,
# We can't control how many rounds are on disk, so no sense
# warning about it.
ignore_few_rounds=True,
)
decryptor = Cipher(
cipher["class"](key[: cipher["key-size"]]),
cipher["mode"](key[cipher["key-size"] :]),
backend=default_backend(),
).decryptor()
private_data = (
decryptor.update(private_ciphertext) + decryptor.finalize()
)
def _read_private_key_openssh(self, lines, password):
"""
Read the new OpenSSH SSH2 private key format available
since OpenSSH version 6.5
Reference:
https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.key
"""
try:
data = decodebytes(b("".join(lines)))
except base64.binascii.Error as e:
raise SSHException("base64 decoding error: {}".format(e))
# read data struct
auth_magic = data[:15]
if auth_magic != OPENSSH_AUTH_MAGIC:
raise SSHException("unexpected OpenSSH key header encountered")
cstruct = self._uint32_cstruct_unpack(data[15:], "sssur")
cipher, kdfname, kdf_options, num_pubkeys, remainder = cstruct
# For now, just support 1 key.
if num_pubkeys > 1:
raise SSHException(
"unsupported: private keyfile has multiple keys"
)
pubkey, privkey_blob = self._uint32_cstruct_unpack(remainder, "ss")