Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_connection_5(self):
async def test():
try:
async with open_connection('http://foo.bar/baz') as con:
self.assertIsInstance(con, WebSocketConnection)
except ValueError:
# ValueError tells us that our WebSocket URI
# doesn't have a "ws" or "wss" scheme, so
# everything is fine with this error.
pass
anyio.run(test)
async def test():
close_string = 'Super important close message.'
async with anyio.create_task_group() as task_group:
con = await create_websocket(task_group, 'wss://echo.websocket.org', use_ssl=True)
self.assertIsInstance(con, WebSocketConnection)
assert not con.closed
await con.close(1000, close_string)
assert con.closed
self.assertIsNone(con.subprotocol)
self.assertEqual(con.close_code.value, 1000)
self.assertEqual(con.close_reason, close_string)
"""
def thread_worker():
nonlocal last_active
run_async_from_thread(sleep_event.set)
time.sleep(0.2)
last_active = 'thread'
run_async_from_thread(finish_event.set)
async def task_worker():
nonlocal last_active
try:
await run_sync_in_worker_thread(thread_worker, cancellable=cancellable)
finally:
last_active = 'task'
sleep_event = create_event()
finish_event = create_event()
last_active = None
async with create_task_group() as tg:
await tg.spawn(task_worker)
await sleep_event.wait()
await tg.cancel_scope.cancel()
await finish_event.wait()
assert last_active == expected_last_active
async def test_event_cancel(self):
async def task():
nonlocal task_started, event_set
task_started = True
await event.wait()
event_set = True
task_started = event_set = False
event = create_event()
async with create_task_group() as tg:
await tg.spawn(task)
await tg.cancel_scope.cancel()
await event.set()
assert task_started
assert not event_set
async def test_increase_tokens(self):
async def setter():
# Wait until waiter() is inside the limiter block
await event1.wait()
async with limiter:
# This can only happen when total_tokens has been increased
await event2.set()
async def waiter():
async with limiter:
await event1.set()
await event2.wait()
limiter = create_capacity_limiter(1)
event1, event2 = create_event(), create_event()
async with create_task_group() as tg:
await tg.spawn(setter)
await tg.spawn(waiter)
await wait_all_tasks_blocked()
assert event1.is_set()
assert not event2.is_set()
await limiter.set_total_tokens(2)
assert event2.is_set()
async def test_limit(self):
async def taskfunc():
nonlocal value
for _ in range(5):
async with limiter:
assert value == 0
value = 1
await wait_all_tasks_blocked()
value = 0
value = 0
limiter = create_capacity_limiter(1)
async with create_task_group() as tg:
for _ in range(3):
await tg.spawn(taskfunc)
async def test_run_in_custom_limiter():
def thread_worker():
nonlocal num_active_threads, max_active_threads
num_active_threads += 1
max_active_threads = max(num_active_threads, max_active_threads)
event.wait(1)
num_active_threads -= 1
async def task_worker():
await run_sync_in_worker_thread(thread_worker, limiter=limiter)
event = threading.Event()
num_active_threads = max_active_threads = 0
limiter = create_capacity_limiter(3)
async with create_task_group() as tg:
for _ in range(4):
await tg.spawn(task_worker)
await sleep(0.1)
assert num_active_threads == 3
assert limiter.borrowed_tokens == 3
event.set()
assert num_active_threads == 0
assert max_active_threads == 3
async def gen():
for _ in range(20):
await yield_(HelloRequest(name=data))
self.assertEqual(
[response.message for response in await self.async_iterable_to_list(
stub.SayHelloToMany(gen()))],
[data * 20]
)
async def main():
async with purerpc.insecure_channel("localhost", port) as channel:
async with anyio.create_task_group() as task_group:
for _ in range(10):
await task_group.spawn(worker, channel)
anyio.run(main)
GreeterStub = grpc_module.GreeterStub
async def worker(channel):
stub = GreeterStub(channel)
response = await stub.SayHello(HelloRequest(name="World"), metadata=metadata)
received_metadata = pickle.loads(base64.b64decode(response.message))
print("Server received metadata (in client)", received_metadata)
self.assertEqual(received_metadata[0][0], "grpc-message-type")
received_metadata = received_metadata[1:]
self.assertEqual(metadata, received_metadata)
async def main():
async with purerpc.insecure_channel("localhost", port) as channel:
await worker(channel)
anyio.run(main)
)
GreeterStub = grpc_module.GreeterStub
async def worker(channel):
stub = GreeterStub(channel)
response = await stub.SayHello(HelloRequest(name="World"), metadata=metadata)
received_metadata = pickle.loads(base64.b64decode(response.message))
print("Server received metadata (in client)", received_metadata)
self.assertEqual(metadata, received_metadata)
async def main():
async with purerpc.insecure_channel("localhost", port) as channel:
await worker(channel)
anyio.run(main)