Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_trasaction_syntax_error(local_process):
local_process.sendline("multi")
local_process.sendline("get foo 1")
local_process.expect("wrong number of arguments for 'get' command")
local_process.expect("transaction")
local_process.sendline("EXEC")
local_process.expect("Transaction discarded because of previous errors.")
with pytest.raises(pexpect.exceptions.TIMEOUT):
local_process.expect("transaction")
def _wait(self, timeout=5):
try:
self.process.expect(EOF, timeout=timeout)
except TIMEOUT:
raise Failure(_("timed out while waiting for program to exit")) from TIMEOUT(timeout)
except UnicodeDecodeError:
raise Failure(_("output not valid ASCII text"))
self.kill()
if self.process.signalstatus == signal.SIGSEGV:
raise Failure(_("failed to execute program due to segmentation fault"))
self.exitcode = self.process.exitstatus
return self
index = child.expect(
['[Pp]assword.*: ', '.* done\.'], timeout=30
)
if index == 0:
child.sendline(password)
self.logger.info('Call: Password entered')
stdout = stderr = child.read().decode()
exit_code = child.wait()
child.close()
self.logger.info(stdout)
else:
proc = komand.helper.exec_command(command)
exit_code = proc['rcode']
stderr = proc['stderr'].decode()
stdout = proc['stdout'].decode()
except pexpect.exceptions.TIMEOUT:
raise TimeoutError(
'Timeout occurred for "{}". Please make sure that the Git '
'repository is available and that the provided credentials '
'are correct. If the issue persists, please contact Komand '
'support'.format(command)
)
except Exception as e:
self.logger.error('Call: Unexpected exception: {}'.format(str(e)))
raise e
if exit_code != 0:
raise OSError(
'Command execution failed: {}\nExit code: {}\n{}'.format(
command, exit_code, stderr
)
)
return []
if not isinstance(patterns, list):
patterns = [patterns]
# Allow dot to match \n
compile_flags = re.DOTALL
if self.ignorecase:
compile_flags = compile_flags | re.IGNORECASE
compiled_pattern_list = []
for idx, p in enumerate(patterns):
if isinstance(p, self.allowed_string_types):
p = self._coerce_expect_string(p)
compiled_pattern_list.append(re.compile(p, compile_flags))
elif p is EOF:
compiled_pattern_list.append(EOF)
elif p is TIMEOUT:
compiled_pattern_list.append(TIMEOUT)
elif isinstance(p, type(re.compile(''))):
compiled_pattern_list.append(p)
else:
self._pattern_type_err(p)
return compiled_pattern_list
:class:`check50.Failure` if stdout is empty
:type prompt: bool
:param timeout: maximum number of seconds to wait for prompt
:type timeout: int / float
:raises check50.Failure: if ``prompt`` is set to True and no prompt is given
"""
if line == EOF:
log("sending EOF...")
else:
log(_("sending input {}...").format(line))
if prompt:
try:
self.process.expect(".+", timeout=timeout)
except (TIMEOUT, EOF):
raise Failure(_("expected prompt for input, found none"))
except UnicodeDecodeError:
raise Failure(_("output not valid ASCII text"))
# Consume everything on the output buffer
try:
for _i in range(int(timeout * 10)):
self.process.expect(".+", timeout=0.1)
except (TIMEOUT, EOF):
pass
try:
if line == EOF:
self.process.sendeof()
else:
self.process.sendline(line)
def turn_off(self):
"""Turn the media player off."""
if self._pianobar is None:
_LOGGER.info("Pianobar subprocess already stopped")
return
self._pianobar.send("q")
try:
_LOGGER.debug("Stopped Pianobar subprocess")
self._pianobar.terminate()
except pexpect.exceptions.TIMEOUT:
# kill the process group
os.killpg(os.getpgid(self._pianobar.pid), signal.SIGTERM)
_LOGGER.debug("Killed Pianobar subprocess")
self._pianobar = None
self._player_state = STATE_OFF
self.schedule_update_ha_state()
# calls. In this case the subprocess we are calling will handle
# closing of old connections.
pass
instance_connection_string = '{0}:{1}:{2}'.format(
project_id, region, instance_name)
instance_flag = '-instances={}=tcp:{}'.format(
instance_connection_string, port)
if cloud_sql_proxy_path is None:
cloud_sql_proxy_path = shutil.which('cloud_sql_proxy')
assert cloud_sql_proxy_path, 'could not find cloud_sql_proxy_path'
process = popen_spawn.PopenSpawn([cloud_sql_proxy_path, instance_flag])
try:
# Make sure cloud sql proxy is started before doing the real work
process.expect('Ready for new connections', timeout=60)
yield
except pexpect.exceptions.TIMEOUT:
raise DatabaseError(
('Cloud SQL Proxy was unable to start after 60 seconds. Output '
'of cloud_sql_proxy: \n{}').format(process.before))
except pexpect.exceptions.EOF:
raise DatabaseError(
('Cloud SQL Proxy exited unexpectedly. Output of '
'cloud_sql_proxy: \n{}').format(process.before))
finally:
process.kill(signal.SIGTERM)
spawn.before = spawn.buffer
spawn.after = TIMEOUT
index = self.searcher.timeout_index
if index >= 0:
spawn.match = TIMEOUT
spawn.match_index = index
return index
else:
spawn.match = None
spawn.match_index = None
msg = str(spawn)
msg += '\nsearcher: %s' % self.searcher
if err is not None:
msg = str(err) + '\n' + msg
exc = TIMEOUT(msg)
exc.__cause__ = None # in Python 3.x we can use "raise exc from None"
raise exc
def timeout(self, err=None):
spawn = self.spawn
spawn.before = spawn.buffer
spawn.after = TIMEOUT
index = self.searcher.timeout_index
if index >= 0:
spawn.match = TIMEOUT
spawn.match_index = index
return index
else:
spawn.match = None
spawn.match_index = None
msg = str(spawn)
msg += '\nsearcher: %s' % self.searcher
if err is not None:
msg = str(err) + '\n' + msg
exc = TIMEOUT(msg)
exc.__cause__ = None # in Python 3.x we can use "raise exc from None"
raise exc