Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_winsize (self):
'''
This tests that the child process can set and get the windows size.
This makes use of an external script sigwinch_report.py.
'''
p1 = pexpect.spawn('%s sigwinch_report.py' % self.PYTHONBIN)
p1.expect('READY', timeout=10)
p1.setwinsize (11,22)
index = p1.expect ([pexpect.TIMEOUT, b'SIGWINCH: \(([0-9]*), ([0-9]*)\)'],
timeout=30)
if index == 0:
self.fail("TIMEOUT -- this platform may not support sigwinch properly.\n" + str(p1))
self.assertEqual(p1.match.group(1, 2), (b"11" ,b"22"))
self.assertEqual(p1.getwinsize(), (11, 22))
time.sleep(1)
p1.setwinsize (24,80)
index = p1.expect ([pexpect.TIMEOUT, b'SIGWINCH: \(([0-9]*), ([0-9]*)\)'],
timeout=10)
if index == 0:
self.fail ("TIMEOUT -- this platform may not support sigwinch properly.\n" + str(p1))
self.assertEqual(p1.match.group(1, 2), (b"24" ,b"80"))
self.assertEqual(p1.getwinsize(), (24, 80))
p1.close()
try:
mav.wait_heartbeat()
setup_rc(mavproxy)
homeloc = mav.location()
# Arm the motors
wait_ready_to_arm(mavproxy)
if not arm_motors(mavproxy, mav):
print("Failed to arm motors")
return False
# Perform mission
return mission(mavproxy, mav)
# enforce a time limit
except pexpect.TIMEOUT:
print("Failed: time out")
return False
finally:
mav.close()
util.pexpect_close(mavproxy)
util.pexpect_close(sitl)
index = gdb.expect([u'{0}.+{1}'.format(pattern, gdb_prompt),
pexpect.TIMEOUT])
if index == 0:
# gdb.after now contains the whole match. Extract the text that
# matches 'pattern'.
match = re.match(pattern, gdb.after, re.DOTALL).group()
elif index == 1:
# We got a timeout exception. Print information on what caused it
# and bail out.
error = ('Response does not match the expected pattern.\n'
'Command: {0}\n'
'Expected pattern: {1}\n'
'Response: {2}'.format(command, pattern, gdb.before))
raise pexpect.TIMEOUT(error)
else:
# Consume just the the gdb prompt.
gdb.expect(gdb_prompt)
return match
def setPortSpeed( self, index, speed=40000 ):
"""
Run client_drivshell on the switch to set speed for a
specific port
index: port index, e.g. 1
speed: port speed, e.g. 40000
"""
try:
self.handle.sendline( "" )
self.handle.expect( self.prompt )
cmd = "client_drivshell port {} sp={}".format( index, speed )
self.handle.sendline( cmd )
self.handle.expect( self.prompt )
response = self.handle.before
return main.TRUE
except pexpect.TIMEOUT:
main.log.error( self.name + ": pexpect.TIMEOUT found" )
return main.FALSE
except pexpect.EOF:
main.log.error( self.name + ": EOF exception found" )
main.log.error( self.name + ": " + self.handle.before )
main.cleanAndExit()
except Exception:
main.log.exception( self.name + ": Uncaught exception!" )
main.cleanAndExit()
securemysql.expect(expectation)
securemysql.sendline("flush privileges;")
expectation = "Query OK"
securemysql.expect(expectation)
securemysql.sendline("quit")
securemysql.wait()
logging.InstallLog.writeToFile("CyberPanel MariaDB root password changed!")
InstallCyberPanel.stdOut("CyberPanel MariaDB root password changed!")
except pexpect.EOF, msg:
logging.InstallLog.writeToFile(str(msg) + " Exception EOF [changeMYSQLRootPasswordCyberPanel]")
except pexpect.TIMEOUT, msg:
print securemysql.before
logging.InstallLog.writeToFile(str(msg) + " Exception EOF [changeMYSQLRootPasswordCyberPanel]")
except BaseException, msg:
logging.InstallLog.writeToFile(str(msg) + "[changeMYSQLRootPasswordCyberPanel]")
return 0
if hard_reset:
log.msg(LOG_PREFIX % self.id + 'Performing hard reset.')
self.kill_sage()
else: # try for a soft reset
INTERRUPT_TRIES = 20
timeout = 0.3
e = self.sage._expect
try:
for i in range(INTERRUPT_TRIES):
self.sage._expect.sendline('q')
self.sage._expect.sendline(chr(3)) # send ctrl-c
try:
e.expect(self.sage._prompt, timeout=timeout)
success = True
break
except (pexpect.TIMEOUT, pexpect.EOF), msg:
success = False
if self.log_level > 3:
msg = 'Interrupting SAGE (try %s)' % i
log.msg(LOG_PREFIX % self.id + msg)
except Exception, msg:
success = False
log.msg(msg)
log.msg(LOG_PREFIX % self.id + "Performing hard reset.")
if not success:
self.kill_sage()
else:
self.sage.reset()
break
except pxssh.ExceptionPxssh:
pass
else:
self.gem5.kill()
raise DeviceError("Failed to connect to the gem5 telnet session.")
self.logger.info("Connected! Waiting for prompt...")
# We need to find the prompt. It might be different if we are resuming
# from a checkpoint. Therefore, we test multiple options here.
prompt_found = False
while not prompt_found:
try:
self.login_to_device()
except TIMEOUT:
pass
try:
# Try and force a prompt to be shown
self.sckt.send('\n')
self.sckt.expect([r'# ', self.sckt.UNIQUE_PROMPT, r'\[PEXPECT\][\\\$\#]+ '], timeout=60)
prompt_found = True
except TIMEOUT:
pass
self.logger.info("Setting unique prompt...")
self.sckt.set_unique_prompt()
self.sckt.prompt()
self.logger.info("Prompt found and replaced with a unique string")
# We check that the prompt is what we think it should be. If not, we
'some_password'}
:param extra_update_fields: a dict used to specify DB fields which should
be updated on the underlying model
object after execution completes
:param idle_timeout a timeout (in seconds); if new output is not
sent to stdout in this interval, the process
will be terminated
:param job_timeout a timeout (in seconds); if the total job runtime
exceeds this, the process will be killed
:param pexpect_timeout a timeout (in seconds) to wait on
`pexpect.spawn().expect()` calls
:param proot_cmd the command used to isolate processes, `bwrap`
Returns a tuple (status, return_code) i.e., `('successful', 0)`
'''
expect_passwords[pexpect.TIMEOUT] = None
expect_passwords[pexpect.EOF] = None
if not isinstance(expect_passwords, collections.OrderedDict):
# We iterate over `expect_passwords.keys()` and
# `expect_passwords.values()` separately to map matched inputs to
# patterns and choose the proper string to send to the subprocess;
# enforce usage of an OrderedDict so that the ordering of elements in
# `keys()` matches `values()`.
expect_passwords = collections.OrderedDict(expect_passwords)
password_patterns = list(expect_passwords.keys())
password_values = list(expect_passwords.values())
logger.debug('Launch Command')
logger.debug(args)
logger.debug('With Environment')
logger.debug(env)
self.handle.sendline(self.cmd)
timeoutVar = self.timeout if self.timeout else 10
index = self.handle.expect([self.prompt, "byte\s\d+", 'Command not found.', pexpect.TIMEOUT,"\n:",pexpect.EOF], timeout = timeoutVar)
if index == 0:
self.LASTRSP = self.LASTRSP + self.handle.before
#print "Expected Prompt Found"
elif index == 1:
self.LASTRSP = self.LASTRSP + self.handle.before
self.handle.send("\r")
print("Found More screen to go , Sending a key to proceed")
indexMore = self.handle.expect(["byte\s\d+", self.prompt], timeout = timeoutVar)
while indexMore == 0:
print "Found another More screen to go , Sending a key to proceed"
self.handle.send("\r")
indexMore = self.handle.expect(["byte\s\d+", self.prompt,pexpect.EOF,pexpect.TIMEOUT], timeout = timeoutVar)
self.LASTRSP = self.LASTRSP + self.handle.before
#print self.LASTRSP
elif index ==2:
print "Command not found"
self.LASTRSP = self.LASTRSP + self.handle.before
elif index ==3:
print "Expected Prompt not found , Time Out!!"
return False
elif index == 4:
self.LASTRSP = self.LASTRSP + self.handle.before
self.handle.sendcontrol("D")
#print "AA"*89
indexMore = self.handle.expect(["\n:", self.prompt,pexpect.EOF,pexpect.TIMEOUT], timeout = timeoutVar)
while indexMore == 0:
self.handle.sendcontrol("D")
],
timeout=5)
elif i == 1:
p.close(force=True)
from .remote import stty_sane
stty_sane()
return f'ssh connection to {self.address} was prompted for password. Please set up public key authentication to the remote host before continue.'
elif i == 2:
p.close()
if p.exitstatus == 0:
return 'OK'
elif p.before:
return p.before.decode()
else:
return f'Command "{cmd}" exits with code {p.exitstatus}'
except pexpect.TIMEOUT:
return f'ssh connection to {self.address} time out with prompt: {str(p.before)}'
except Exception as e:
return f'Failed to check remote connection {self.address}:{self.port}: {e}'
return "OK"