Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def run(configuration: QuicConfiguration, url: str, data: str) -> None:
# parse URL
parsed = urlparse(url)
assert parsed.scheme == "https", "Only https:// URLs are supported."
if ":" in parsed.netloc:
host, port_str = parsed.netloc.split(":")
port = int(port_str)
else:
host = parsed.netloc
port = 443
async with connect(
host,
port,
configuration=configuration,
create_protocol=H3Dispatcher,
session_ticket_handler=save_session_ticket,
) as dispatch:
client = AsyncClient(dispatch=cast(AsyncDispatcher, dispatch))
# perform request
start = time.time()
if data is not None:
response = await client.post(
url,
data=data.encode(),
headers={"content-type": "application/x-www-form-urlencoded"},
)
print_response: bool,
) -> None:
# parse URL
parsed = urlparse(url)
assert parsed.scheme in (
"https",
"wss",
), "Only https:// or wss:// URLs are supported."
if ":" in parsed.netloc:
host, port_str = parsed.netloc.split(":")
port = int(port_str)
else:
host = parsed.netloc
port = 443
async with connect(
host,
port,
configuration=configuration,
create_protocol=HttpClient,
session_ticket_handler=save_session_ticket,
) as client:
client = cast(HttpClient, client)
if parsed.scheme == "wss":
ws = await client.websocket(url, subprotocols=["chat", "superchat"])
# send some messages and receive reply
for i in range(2):
message = "Hello {}, WebSocket!".format(i)
print("> " + message)
await ws.send(message)
async def run(configuration: QuicConfiguration, url: str, data: str, headers: Dict = {}, allow_push: bool = True, response: Response = None) -> None:
# parse URL
parsed = urlparse(url)
assert parsed.scheme in (
"https"
), "Only https:// URLs are supported."
if ":" in parsed.netloc:
host, port_str = parsed.netloc.split(":")
port = int(port_str)
else:
host = parsed.netloc
port = 443
async with connect(
host,
port,
configuration=configuration,
create_protocol=HttpClient,
session_ticket_handler=save_session_ticket,
) as client:
client = cast(HttpClient, client)
if parsed.scheme == "wss":
pass
else:
# perform request
start = time.time()
if data is not None:
headers ['content-type'] = "application/x-www-form-urlencoded"
http_events = await client.post(
def test_subtract_split(self):
rangeset = RangeSet()
rangeset.add(0, 10)
rangeset.subtract(2, 5)
self.assertEqual(list(rangeset), [range(0, 2), range(5, 10)])
def test_pull_short_header_no_fixed_bit(self):
buf = Buffer(data=b"\x00")
with self.assertRaises(ValueError) as cm:
pull_quic_header(buf, host_cid_length=8)
self.assertEqual(str(cm.exception), "Packet fixed bit is zero")
def test_pull_long_header_no_fixed_bit(self):
buf = Buffer(data=b"\x80\xff\x00\x00\x11\x00\x00")
with self.assertRaises(ValueError) as cm:
pull_quic_header(buf, host_cid_length=8)
self.assertEqual(str(cm.exception), "Packet fixed bit is zero")
def test_handle_padding_frame(self):
client = create_standalone_client(self)
# no more padding
buf = Buffer(data=b"")
client._handle_padding_frame(
client_receive_context(client), QuicFrameType.PADDING, buf
)
self.assertEqual(buf.tell(), 0)
# padding until end
buf = Buffer(data=bytes(10))
client._handle_padding_frame(
client_receive_context(client), QuicFrameType.PADDING, buf
)
self.assertEqual(buf.tell(), 10)
# padding then something else
buf = Buffer(data=bytes(10) + b"\x01")
client._handle_padding_frame(
client_receive_context(client), QuicFrameType.PADDING, buf
def test_data_slice(self):
buf = Buffer(data=b"\x08\x07\x06\x05\x04\x03\x02\x01")
self.assertEqual(buf.data_slice(0, 8), b"\x08\x07\x06\x05\x04\x03\x02\x01")
self.assertEqual(buf.data_slice(1, 3), b"\x07\x06")
with self.assertRaises(BufferReadError):
buf.data_slice(-1, 3)
with self.assertRaises(BufferReadError):
buf.data_slice(0, 9)
with self.assertRaises(BufferReadError):
buf.data_slice(1, 0)
def test_pull_retry(self):
buf = Buffer(data=load("retry.bin"))
header = pull_quic_header(buf, host_cid_length=8)
self.assertTrue(header.is_long_header)
self.assertEqual(header.version, QuicProtocolVersion.DRAFT_24)
self.assertEqual(header.packet_type, PACKET_TYPE_RETRY)
self.assertEqual(header.destination_cid, binascii.unhexlify("fee746dfde699d61"))
self.assertEqual(header.source_cid, binascii.unhexlify("59aa0942fd2f11e9"))
self.assertEqual(
header.original_destination_cid, binascii.unhexlify("d61e7448e0d63dff")
)
self.assertEqual(
header.token,
binascii.unhexlify(
"5282f57f85a1a5c50de5aac2ff7dba43ff34524737099ec41c4b8e8c76734f935e8efd51177dbbe764"
),
)
self.assertEqual(header.rest_length, 0)
def test_handle_datagram_frame_not_allowed(self):
client = create_standalone_client(self, max_datagram_frame_size=None)
with self.assertRaises(QuicConnectionError) as cm:
client._handle_datagram_frame(
client_receive_context(client),
QuicFrameType.DATAGRAM,
Buffer(data=b"hello"),
)
self.assertEqual(cm.exception.error_code, QuicErrorCode.PROTOCOL_VIOLATION)
self.assertEqual(cm.exception.frame_type, QuicFrameType.DATAGRAM)
self.assertEqual(cm.exception.reason_phrase, "Unexpected DATAGRAM frame")