Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def SayHelloToMany(self, messages):
async for message in messages:
await anyio.sleep(0.05)
await yield_(HelloReply(message="Hello, " + message.name))
async def say_hello_goodbye(message: HelloRequest) -> Stream[HelloReply]:
await yield_(HelloReply(message="Hello, " + message.name))
await anyio.sleep(0.05)
await yield_(HelloReply(message="Goodbye, " + message.name))
async def test_cancel_propagation_with_inner_spawn():
async def g():
async with anyio.create_task_group() as g:
await g.spawn(anyio.sleep, 10)
await anyio.sleep(5)
assert False
async with anyio.create_task_group() as group:
await group.spawn(g)
await anyio.sleep(0.1)
await group.cancel_scope.cancel()
async def test_cancel_scope_cleared():
async with move_on_after(0.1):
await sleep(1)
await sleep(0)
async def SayHelloGoodbye(self, message):
await yield_(HelloReply(message="Hello, " + message.name))
await anyio.sleep(0.05)
await yield_(HelloReply(message="Goodbye, " + message.name))
async def heartbeat(self, interval, nursery):
await anyio.sleep(interval)
if self.ws is not None:
if self.heartbeat_acked:
self.heartbeat_acked = False
await self.send(self.HEARTBEAT, self.sequence)
await nursery.spawn(self.heartbeat, interval, nursery)
else:
await self.ws.close()
async def sleep_forever():
while True:
# Sleep for a day
await anyio.sleep(60 * 60 * 24)
async def print_memory_growth_statistics(interval_sec=10.0, set_pdb_trace_every=math.inf):
num_iters = 0
import objgraph
while True:
num_iters += 1
await anyio.sleep(interval_sec)
objgraph.show_growth()
if num_iters == set_pdb_trace_every:
pdb.set_trace()
num_iters = 0
async def _handle_close_connection_event(self, event: CloseConnection):
await anyio.sleep(0)
if self.state is ConnectionState.REMOTE_CLOSING:
await self._send(event.response())
await self._close_handshake.set()
async def sleeper():
if sleep_time >= 0:
await anyio.sleep(sleep_time)