Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_run(self):
ret, out = run("echo hello")
self.assertEqual(ret, 0)
self.assertEqual(out, b"hello\n")
ret, out = run(["echo", "'hello'"])
self.assertEqual(ret, 0)
self.assertEqual(out, b"'hello'\n")
ret, out = run(["echo", "\" ' "])
self.assertEqual(ret, 0)
self.assertEqual(out, b"\" ' \n")
# test a longer output that needs to be read in several chunks
ret, out = run("echo -n '%s'; sleep 0.2; echo -n '%s'" % (10000 * "x", 10 * "a"), logfile=self.tmp_file, can_fail=True)
self.assertEqual(ret, 0)
self.assertEqual(out, 10000 * b"x" + 10 * b"a")
# check if log file is written properly; it is supposed to append data to existing content
def test_run_show_cmd_logfile_stdout(self, mock_out):
logfile = os.path.join(self.tmp_dir, 'output.log')
ret, out = run("echo foo", show_cmd=True, logfile=logfile, stdout=True)
self.assertEqual(ret, 0)
self.assertEqual(out, b'foo\n')
with open(logfile) as f:
self.assertEqual(f.read(),
'COMMAND: echo foo\n-----------------\nfoo\n')
self.assertEqual(mock_out.getvalue(),
'COMMAND: echo foo\n-----------------\nfoo\n')
def test_run_in_text_mode(self):
"""test run with kwargs 'text', 'encoding' or/and 'errors' set. These kwargs are
added to Popen from python3.6(encoding, errors) and python3.7(text). Running test
with python version older than 3.7 is expected to fail
"""
ret, out = run("echo hello", text=True)
self.assertEqual(ret, 0)
self.assertEqual(out, "hello\n")
ret, out = run("echo hello", encoding="utf-8")
self.assertEqual(ret, 0)
self.assertEqual(out, "hello\n")
ret, out = run("echo hello", errors="strict")
self.assertEqual(ret, 0)
self.assertEqual(out, "hello\n")
def test_run_stdout(self, mock_out):
ret, out = run("echo foo", stdout=True)
self.assertEqual(ret, 0)
self.assertEqual(out, b'foo\n')
self.assertEqual(mock_out.getvalue(),
'foo\n')
self.assertEqual(out, b"\" ' \n")
# test a longer output that needs to be read in several chunks
ret, out = run("echo -n '%s'; sleep 0.2; echo -n '%s'" % (10000 * "x", 10 * "a"), logfile=self.tmp_file, can_fail=True)
self.assertEqual(ret, 0)
self.assertEqual(out, 10000 * b"x" + 10 * b"a")
# check if log file is written properly; it is supposed to append data to existing content
self.assertEqual("\n".join(read_from_file(self.tmp_file)), "test" + 10000 * "x" + 10 * "a")
ret, out = run("exit 1", can_fail=True)
self.assertEqual(ret, 1)
self.assertRaises(RuntimeError, run, "exit 1")
# stdin test
ret, out = run("xargs -0 echo -n", stdin_data=b"\0".join([str(i).encode() for i in range(10000)]))
self.assertEqual(out, b" ".join([str(i).encode() for i in range(10000)]))
# return None
ret, out = run("xargs echo", stdin_data=b"\n".join([str(i).encode() for i in range(1000000)]), return_stdout=False)
self.assertEqual(out, None)
# log file with absolute path
log_file = os.path.join(self.tmp_dir, "a.log")
ret, out = run("echo XXX", logfile=log_file)
self.assertEqual(open(log_file, "r").read(), "XXX\n")
# log file with relative path
log_file = "b.log"
cwd = os.getcwd()
os.chdir(self.tmp_dir)
ret, out = run("echo XXX", logfile=log_file)
ret, out = run(["echo", "'hello'"])
self.assertEqual(ret, 0)
self.assertEqual(out, b"'hello'\n")
ret, out = run(["echo", "\" ' "])
self.assertEqual(ret, 0)
self.assertEqual(out, b"\" ' \n")
# test a longer output that needs to be read in several chunks
ret, out = run("echo -n '%s'; sleep 0.2; echo -n '%s'" % (10000 * "x", 10 * "a"), logfile=self.tmp_file, can_fail=True)
self.assertEqual(ret, 0)
self.assertEqual(out, 10000 * b"x" + 10 * b"a")
# check if log file is written properly; it is supposed to append data to existing content
self.assertEqual("\n".join(read_from_file(self.tmp_file)), "test" + 10000 * "x" + 10 * "a")
ret, out = run("exit 1", can_fail=True)
self.assertEqual(ret, 1)
self.assertRaises(RuntimeError, run, "exit 1")
# stdin test
ret, out = run("xargs -0 echo -n", stdin_data=b"\0".join([str(i).encode() for i in range(10000)]))
self.assertEqual(out, b" ".join([str(i).encode() for i in range(10000)]))
# return None
ret, out = run("xargs echo", stdin_data=b"\n".join([str(i).encode() for i in range(1000000)]), return_stdout=False)
self.assertEqual(out, None)
# log file with absolute path
log_file = os.path.join(self.tmp_dir, "a.log")
ret, out = run("echo XXX", logfile=log_file)
self.assertEqual(open(log_file, "r").read(), "XXX\n")
def test_run_univ_nl_stdout(self, mock_out):
ret, out = run("echo foo", universal_newlines=True, stdout=True)
self.assertEqual(ret, 0)
self.assertEqual(out, 'foo\n')
self.assertEqual(mock_out.getvalue(),
'foo\n')
def test_run_univ_nl_show_cmd_stdout(self, mock_out):
ret, out = run("echo foo", universal_newlines=True, show_cmd=True,
stdout=True)
self.assertEqual(ret, 0)
self.assertEqual(out, 'foo\n')
self.assertEqual(mock_out.getvalue(),
'COMMAND: echo foo\n-----------------\nfoo\n')
def test_run_in_text_mode(self):
"""test run with kwargs 'text', 'encoding' or/and 'errors' set. These kwargs are
added to Popen from python3.6(encoding, errors) and python3.7(text). Running test
with python version older than 3.7 is expected to fail
"""
ret, out = run("echo hello", text=True)
self.assertEqual(ret, 0)
self.assertEqual(out, "hello\n")
ret, out = run("echo hello", encoding="utf-8")
self.assertEqual(ret, 0)
self.assertEqual(out, "hello\n")
ret, out = run("echo hello", errors="strict")
self.assertEqual(ret, 0)
self.assertEqual(out, "hello\n")
def call(self, args, include_args_in_output=True):
"""
A wrapper around kobo.shortcuts.run. If ssh_exchange_identification or
max-concurrent-connections exceptions are thrown by ssh, up to 10 retries follows.
:param args: list of args for rsync
:type args: list
:param include_args_in_output: include the rsync arguments in output or not
:type include_args_in_output: bool
:return: (boolean indicating success or failure, output from rsync command)
:rtype: tuple of boolean and string
"""
for t in xrange(10):
rv, out = kobo.shortcuts.run(cmd=args, can_fail=True)
possible_known_exceptions = \
("ssh_exchange_identification:" in out) or ("max-concurrent-connections=25" in out)
if not (rv and possible_known_exceptions):
break
_logger.info(_("Connections limit reached, trying once again in thirty seconds."))
time.sleep(30)
if include_args_in_output:
message = "%s\n%s" % (args, out)
else:
message = out
return (rv == 0, message)