Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _conduct_login(self, process, email, password, expect_success) -> None:
process.expect_exact("Email: ")
process.sendline(email)
process.expect("Password: ")
process.sendline(password)
if expect_success:
try:
process.expect_exact(
"We strongly recommend enabling multi-factor authentication:"
)
except pexpect.exceptions.EOF:
self.fail(
"Login failed. Login error: {}".format(process.before.decode())
)
except pexpect.exceptions.TIMEOUT as pexpect_timeout:
self.fail(
"Timed out waiting for string {!r} in stream:{} ".format(
str(pexpect_timeout), process.before.decode()
)
assert child.expect("h") == 0
assert child.send("b") == 1
assert child.expect("o") == 0
assert child.send("b") == 1
assert child.expect(" ") == 0
assert child.send("b") == 1
assert child.expect("h") == 0
assert child.send("b") == 1
assert child.expect("i") == 0
assert child.send("b") == 1
with pytest.raises(pexpect.exceptions.TIMEOUT):
child.expect(".",timeout=1)
child.sendcontrol(r"m")
assert child.expect("\r\n") == 0
assert child.expect("hi\r\n") == 0
child.expect(r"\$>>> ")
child.sendcontrol(r"m")
assert child.expect("\r\n") == 0
assert child.expect("\r\n") == 0
assert child.expect("Session Finished. Press Enter.") == 0
assert child.expect("\r\n") == 0
assert child.expect(b"\x00") == 0 # NOT SURE WHAT IS SENDING THIS.
def _with_cloud_sql_proxy(self):
try:
instance_connection_string = self._get_instance_connection_string()
instance_flag = '-instances={}=tcp:5432'.format(
instance_connection_string)
process = pexpect.spawn(
self.cloud_sql_proxy_path, args=[instance_flag])
CLOUD_SQL_READY = 'Ready for new connections'
process.expect(
CLOUD_SQL_READY, timeout=5) # To avoid race condition
yield
except pexpect.exceptions.TIMEOUT:
raise CloudSqlProxyError(
'Cloud SQL Proxy was unable to start correctly')
finally:
process.kill(signal.SIGTERM)
Wrap expect to handle disconnections and fetch responses
`expected_output` can be a string or regex or list of either.
The index of the first match will be set to self._match_index and the text
between the last read of the buffer and the match will be returned.
"""
try:
self._proc.expect(expected_output, timeout=timeout)
except pexpect.exceptions.TIMEOUT:
for _ in range(10):
if not self._proc.isalive():
break
time.sleep(1)
debug_info = f'{self._proc.before} {self._proc.buffer} {self._proc.after}'
raise ExpectMoreTimeoutException(f"timed out waiting for expected output '{expected_output}'.\nBuffer: {debug_info}")
except pexpect.exceptions.EOF:
raise ExpectMoreException(f"connection to process not available.")
self.match_index = self._proc.match_index
output = self.before
self._session_script.extend(output)
return output
def _send_station_list_command(self):
"""Send a station list command."""
self._pianobar.send("s")
try:
self._pianobar.expect("Select station:", timeout=1)
except pexpect.exceptions.TIMEOUT:
# try again. Buffer was contaminated.
self._clear_buffer()
self._pianobar.send("s")
self._pianobar.expect("Select station:")
value = self._ssh.before.decode("utf-8")
if self._value_template is not None:
self._state = self._value_template.render_with_possible_json_value(
value, STATE_UNKNOWN)
else:
self._state = value
_LOGGER.debug(self._state)
except pxssh.ExceptionPxssh as err:
_LOGGER.error("Unexpected SSH error: %s", str(err))
self._disconnect()
return None
except (AssertionError, exceptions.EOF) as err:
_LOGGER.error("Connection unavailable: %s", str(err))
self._disconnect()
return None
def connect(self, ip, port):
args = ' '.join([self.client_bin_path, ip, port, self.dl_path])
try:
self.pipe = pexpect.popen_spawn.PopenSpawn(args)
except pexpect.exceptions.ExceptionPexpect:
self.error = 'Cannot open pipe'
return False
ret = self.pipe.expect(['> ', 'Couldn\'t connect to host', 'Exception:'])
if ret != 0:
if ret == 1:
self.error = 'Cannot connect'
else:
self.error = 'Cannot open download directory'
return False
return True