Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_apython_server(capfd, event_loop, monkeypatch):
def run_forever(self, orig=InteractiveEventLoop.run_forever):
if self.console_server is not None:
self.call_later(0, self.stop)
return orig(self)
with patch('aioconsole.InteractiveEventLoop.run_forever', run_forever):
with pytest.raises(SystemExit):
apython.run_apython(['--serve=:0'])
out, err = capfd.readouterr()
assert out.startswith('The console is being served on')
assert err == ''
host="127.0.0.1", port=0, banner='test')
address = server.sockets[0].getsockname()
stream = io.StringIO()
print_server(server, "test console", file=stream)
expected = "The test console is being served on 127.0.0.1:{}\n"
assert stream.getvalue() == expected.format(address[1])
reader, writer = yield from asyncio.open_connection(*address)
assert (yield from reader.readline()) == b'test\n'
writer.write(b'1+1\n')
assert (yield from reader.readline()) == b'>>> 2\n'
writer.write_eof()
assert (yield from reader.readline()) == b'>>> \n'
writer.close()
if compat.PY37:
yield from writer.wait_closed()
server.close()
yield from server.wait_closed()
def test_apython_with_prompt_control_and_ainput(capfd):
input_string = "{} ainput()\nhello\n".format(
'await' if compat.PY35 else 'yield from')
with patch('sys.stdin', new=io.StringIO(input_string)):
with pytest.raises(SystemExit):
apython.run_apython(
['--no-readline', '--banner=test', '--prompt-control=▲'])
out, err = capfd.readouterr()
assert out == ''
assert err == "test\n▲>>> ▲▲▲'hello'\n▲>>> ▲\n"
writer2.write('b\n')
yield from writer2.drain()
data = yield from reader.readline()
assert data == b'b\n'
assert stderr.getvalue() == 'b\n'
writer2.stream = Mock(spec={})
yield from writer2.drain()
data = yield from reader.read(2)
assert data == b'c\n'
assert reader.at_eof() == False
if compat.PY35:
assert (yield from reader.__aiter__()) == reader
assert (yield from reader.__anext__()) == b'd\n'
with pytest.raises(StopAsyncIteration):
yield from reader.__anext__()
else:
assert (yield from reader.read()) == b'd\n'
assert (yield from reader.read()) == b''
assert reader.at_eof() == True
def test_interact_traceback(event_loop, monkeypatch):
with stdcontrol(event_loop, monkeypatch) as (reader, writer):
banner = "A BANNER"
writer.write('1/0\n')
writer.stream.close()
yield from interact(banner=banner, stop=False)
# Check stderr
yield from assert_stream(reader, banner)
yield from assert_stream(
reader, sys.ps1 + 'Traceback (most recent call last):')
# Skip 3 (or 5) lines
for _ in range(3 if compat.PY35 else 5):
yield from reader.readline()
# Check stderr
yield from assert_stream(reader, "ZeroDivisionError: division by zero")
yield from assert_stream(reader, sys.ps1)
def stdcontrol(event_loop, monkeypatch):
# PS1
monkeypatch.setattr('sys.ps1', "[Hello!]", raising=False)
# Stdin control
stdin_read, stdin_write = os.pipe()
monkeypatch.setattr('sys.stdin', open(stdin_read))
writer = NonFileStreamWriter(open(stdin_write, 'w'), loop=event_loop)
# Stdout control
monkeypatch.setattr(sys, 'stdout', io.StringIO())
# Stderr control
stderr_read, stderr_write = os.pipe()
monkeypatch.setattr('sys.stderr', open(stderr_write, 'w'))
reader = NonFileStreamReader(open(stderr_read), loop=event_loop)
# Yield
yield reader, writer
# Check
assert sys.stdout.getvalue() == ''
tempfd.write(startupfile.encode())
tempfd.flush()
test_vectors = (
('print(foo)\n', '', '>>> 1\n'),
('print(hehe())\n', '', '>>> 42\n'),
('print(r)\n', '', '>>> 1.0\n'),
('pprint({1:2})\n', '{1: 2}\n', '>>> >>> \n'),
)
inputstr = ''.join([tv[0] for tv in test_vectors])
outstr = ''.join([tv[1] for tv in test_vectors])
errstr = 'test\n' + ''.join([tv[2] for tv in test_vectors])
with patch('sys.stdin', new=io.StringIO(inputstr)):
with pytest.raises(SystemExit):
apython.run_apython(['--banner=test'] + use_readline)
out, err = capfd.readouterr()
assert out == outstr
assert err == errstr
def test_apython_with_stdout_logs(capfd, use_readline):
with patch('sys.stdin', new=io.StringIO(
'import sys; sys.stdout.write("logging") or 7\n')):
with pytest.raises(SystemExit):
apython.run_apython(['--banner=test'] + use_readline)
out, err = capfd.readouterr()
assert out == 'logging'
assert err == 'test\n>>> 7\n>>> \n'
def test_apython_with_ainput(capfd, use_readline):
input_string = "{} ainput()\nhello\n".format(
'await' if compat.PY35 else 'yield from')
with patch('sys.stdin', new=io.StringIO(input_string)):
with pytest.raises(SystemExit):
apython.run_apython(['--banner=test'] + use_readline)
out, err = capfd.readouterr()
assert out == ''
assert err == "test\n>>> 'hello'\n>>> \n"
def test_apython_non_existing_module(capfd):
with pytest.raises(SystemExit):
apython.run_apython(['-m', 'idontexist'])
out, err = capfd.readouterr()
assert out == ''
assert "No module named idontexist" in err