Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def run(configuration: QuicConfiguration, url: str, data: str) -> None:
# parse URL
parsed = urlparse(url)
assert parsed.scheme == "https", "Only https:// URLs are supported."
if ":" in parsed.netloc:
host, port_str = parsed.netloc.split(":")
port = int(port_str)
else:
host = parsed.netloc
port = 443
async with connect(
host,
port,
configuration=configuration,
create_protocol=H3Dispatcher,
session_ticket_handler=save_session_ticket,
) as dispatch:
client = AsyncClient(dispatch=cast(AsyncDispatcher, dispatch))
# perform request
start = time.time()
if data is not None:
response = await client.post(
url,
data=data.encode(),
headers={"content-type": "application/x-www-form-urlencoded"},
)
print_response: bool,
) -> None:
# parse URL
parsed = urlparse(url)
assert parsed.scheme in (
"https",
"wss",
), "Only https:// or wss:// URLs are supported."
if ":" in parsed.netloc:
host, port_str = parsed.netloc.split(":")
port = int(port_str)
else:
host = parsed.netloc
port = 443
async with connect(
host,
port,
configuration=configuration,
create_protocol=HttpClient,
session_ticket_handler=save_session_ticket,
) as client:
client = cast(HttpClient, client)
if parsed.scheme == "wss":
ws = await client.websocket(url, subprotocols=["chat", "superchat"])
# send some messages and receive reply
for i in range(2):
message = "Hello {}, WebSocket!".format(i)
print("> " + message)
await ws.send(message)
async def run(configuration: QuicConfiguration, url: str, data: str, headers: Dict = {}, allow_push: bool = True, response: Response = None) -> None:
# parse URL
parsed = urlparse(url)
assert parsed.scheme in (
"https"
), "Only https:// URLs are supported."
if ":" in parsed.netloc:
host, port_str = parsed.netloc.split(":")
port = int(port_str)
else:
host = parsed.netloc
port = 443
async with connect(
host,
port,
configuration=configuration,
create_protocol=HttpClient,
session_ticket_handler=save_session_ticket,
) as client:
client = cast(HttpClient, client)
if parsed.scheme == "wss":
pass
else:
# perform request
start = time.time()
if data is not None:
headers ['content-type'] = "application/x-www-form-urlencoded"
http_events = await client.post(
async def run_client_ping(host, port=4433):
configuration = QuicConfiguration(is_client=True)
configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
async with connect(host, port, configuration=configuration) as client:
await client.ping()
await client.ping()
async def run_client(
self,
host,
port=4433,
cadata=None,
cafile=SERVER_CACERTFILE,
configuration=None,
request=b"ping",
**kwargs
):
if configuration is None:
configuration = QuicConfiguration(is_client=True)
configuration.load_verify_locations(cadata=cadata, cafile=cafile)
async with connect(host, port, configuration=configuration, **kwargs) as client:
# waiting for connected when connected returns immediately
await client.wait_connected()
reader, writer = await client.create_stream()
self.assertEqual(writer.can_write_eof(), True)
self.assertEqual(writer.get_extra_info("stream_id"), 0)
writer.write(request)
writer.write_eof()
response = await reader.read()
# waiting for closed when closed returns immediately
await client.wait_closed()
return response
async def run_client_change_connection_id(host, port=4433):
configuration = QuicConfiguration(is_client=True)
configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
async with connect(host, port, configuration=configuration) as client:
await client.ping()
client.change_connection_id()
await client.ping()
async def run_client_key_update(host, port=4433):
configuration = QuicConfiguration(is_client=True)
configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
async with connect(host, port, configuration=configuration) as client:
await client.ping()
client.request_key_update()
await client.ping()
async def run_client_writelines(host, port=4433):
configuration = QuicConfiguration(is_client=True)
configuration.load_verify_locations(cafile=SERVER_CACERTFILE)
async with connect(host, port, configuration=configuration) as client:
reader, writer = await client.create_stream()
assert writer.can_write_eof() is True
writer.writelines([b"01234567", b"89012345"])
writer.write_eof()
return await reader.read()
async def run_client_without_config(host, port=4433):
async with connect(host, port) as client:
await client.ping()
async def run(configuration: QuicConfiguration, host: str, port: int) -> None:
async with connect(
host, port, configuration=configuration, create_protocol=SiduckClient
) as client:
client = cast(SiduckClient, client)
logger.info("sending quack")
await client.quack()
logger.info("received quack-ack")