Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_apprtc(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "apprtc"])
# connect
sig_server = create_signaling(args)
server_params = run(sig_server.connect())
self.assertTrue(server_params["is_initiator"])
args.signaling_room = server_params["room_id"]
sig_client = create_signaling(args)
client_params = run(sig_client.connect())
self.assertTrue(client_params["is_initiator"])
# exchange signaling
res = run(asyncio.gather(sig_server.send(offer), delay(sig_client.receive)))
self.assertEqual(res[1], offer)
res = run(asyncio.gather(sig_client.send(answer), delay(sig_server.receive)))
self.assertEqual(res[1], answer)
# shutdown
run(asyncio.gather(sig_server.close(), sig_client.close()))
def test_unix_socket_abrupt_disconnect(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "unix-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
run(sig_server.connect())
run(sig_client.connect())
res = run(asyncio.gather(sig_server.send(offer), delay(sig_client.receive)))
self.assertEqual(res[1], offer)
# break connection
sig_client._writer.close()
sig_server._writer.close()
res = run(sig_server.receive())
self.assertIsNone(res)
res = run(sig_client.receive())
def test_apprtc(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "apprtc"])
# connect
sig_server = create_signaling(args)
server_params = run(sig_server.connect())
self.assertTrue(server_params["is_initiator"])
args.signaling_room = server_params["room_id"]
sig_client = create_signaling(args)
client_params = run(sig_client.connect())
self.assertTrue(client_params["is_initiator"])
# exchange signaling
res = run(asyncio.gather(sig_server.send(offer), delay(sig_client.receive)))
self.assertEqual(res[1], offer)
res = run(asyncio.gather(sig_client.send(answer), delay(sig_server.receive)))
self.assertEqual(res[1], answer)
# shutdown
def test_apprtc_with_buffered_message(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "apprtc"])
# connect first party and send offer
sig_server = create_signaling(args)
server_params = run(sig_server.connect())
self.assertTrue(server_params["is_initiator"])
res = run(sig_server.send(offer))
# connect second party and receive offer
args.signaling_room = server_params["room_id"]
sig_client = create_signaling(args)
client_params = run(sig_client.connect())
self.assertTrue(client_params["is_initiator"])
received = run(sig_client.receive())
self.assertEqual(received, offer)
# exchange answer
res = run(asyncio.gather(sig_client.send(answer), delay(sig_server.receive)))
def test_unix_socket(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "unix-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
run(sig_server.connect())
run(sig_client.connect())
res = run(asyncio.gather(sig_server.send(offer), delay(sig_client.receive)))
self.assertEqual(res[1], offer)
res = run(asyncio.gather(sig_client.send(answer), delay(sig_server.receive)))
self.assertEqual(res[1], answer)
run(asyncio.gather(sig_server.close(), sig_client.close()))
def test_tcp_socket(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "tcp-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
run(sig_server.connect())
run(sig_client.connect())
res = run(asyncio.gather(sig_server.send(offer), delay(sig_client.receive)))
self.assertEqual(res[1], offer)
res = run(asyncio.gather(sig_client.send(answer), delay(sig_server.receive)))
self.assertEqual(res[1], answer)
run(asyncio.gather(sig_server.close(), sig_client.close()))
def test_unix_socket_abrupt_disconnect(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "unix-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
run(sig_server.connect())
run(sig_client.connect())
res = run(asyncio.gather(sig_server.send(offer), delay(sig_client.receive)))
self.assertEqual(res[1], offer)
# break connection
sig_client._writer.close()
sig_server._writer.close()
res = run(sig_server.receive())
self.assertIsNone(res)
def test_tcp_socket_abrupt_disconnect(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "tcp-socket"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
# connect
run(sig_server.connect())
run(sig_client.connect())
res = run(asyncio.gather(sig_server.send(offer), delay(sig_client.receive)))
self.assertEqual(res[1], offer)
# break connection
sig_client._writer.close()
sig_server._writer.close()
res = run(sig_server.receive())
self.assertIsNone(res)
def test_copy_and_paste(self):
parser = argparse.ArgumentParser()
add_signaling_arguments(parser)
args = parser.parse_args(["-s", "copy-and-paste"])
sig_server = create_signaling(args)
sig_client = create_signaling(args)
class MockReader:
def __init__(self, queue):
self.queue = queue
async def readline(self):
return await self.queue.get()
class MockWritePipe:
def __init__(self, queue, encoding):
self.encoding = encoding
self.queue = queue
def write(self, msg):
asyncio.ensure_future(self.queue.put(msg.encode(self.encoding)))
await consume_signaling(pc, signaling)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="VPN over data channel")
parser.add_argument("role", choices=["offer", "answer"])
parser.add_argument("--verbose", "-v", action="count")
add_signaling_arguments(parser)
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
tap = tuntap.Tun(name="revpn-%s" % args.role)
signaling = create_signaling(args)
pc = RTCPeerConnection()
if args.role == "offer":
coro = run_offer(pc, signaling, tap)
else:
coro = run_answer(pc, signaling, tap)
# run event loop
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coro)
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(pc.close())
loop.run_until_complete(signaling.close())
tap.close()