Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def worker(port, queue, num_concurrent_streams, num_requests_per_stream,
num_rounds, message_size, load_type):
async with purerpc.insecure_channel("localhost", port) as channel:
stub = GreeterStub(channel)
if load_type == "unary":
load_fn = do_load_unary
elif load_type == "stream":
load_fn = do_load_stream
else:
raise ValueError(f"Unknown load type: {load_type}")
for _ in range(num_rounds):
start = time.time()
task_results = anyio.create_queue(sys.maxsize)
async with anyio.create_task_group() as task_group:
for _ in range(num_concurrent_streams):
await task_group.spawn(load_fn, task_results, stub, num_requests_per_stream, message_size)
end = time.time()
rps = num_concurrent_streams * num_requests_per_stream / (end - start)
queue.put(rps)
results = []
for _ in range(num_concurrent_streams):
results.append(await task_results.get())
queue.put(results)
queue.close()
queue.join_thread()
# chunk the message up
if isinstance(content, list):
self._message_chunks = content
else:
self._message_chunks = [
self._content[i : i + break_at] for i in range(0, len(self._content), break_at)
]
#: The current page this paginator is on.
self.page = 0
#: The message object that is being edited.
self._message = None # type: Message
self._running = False
self._reaction_queue = anyio.create_queue(1)
def __init__(self, grpc_connection: GRPCConnection, stream_id: int, socket: SocketWrapper,
grpc_socket: "GRPCSocket"):
self._stream_id = stream_id
self._grpc_connection = grpc_connection
self._grpc_socket = grpc_socket
self._socket = socket
self._flow_control_update_event = anyio.create_event()
self._incoming_events = anyio.create_queue(sys.maxsize)
self._response_started = False
self._state = GRPCStreamState.OPEN
self._start_stream_event = None
self._end_stream_event = None
self._path = path
self._subprotocols = subprotocols
self._connection_subprotocol = None
self._handshake_headers = None
self._headers = headers
self.message_queue_size = message_queue_size
self.max_message_size = max_message_size
self._reject_status = None
self._reject_headers = None
self._reject_body = b''
self._stream_lock = anyio.create_lock()
self._message_size = 0
self._message_parts = []
self._event_queue = anyio.create_queue(self.message_queue_size)
self._pings = OrderedDict()
self._open_handshake = anyio.create_event()
self._close_handshake = anyio.create_event()
async def _aio_gather_iter_pairs(*aio_tasks):
"""Spawn async tasks and yield with pairs of ids with results."""
aio_tasks_num = len(aio_tasks)
task_res_q = create_queue(aio_tasks_num)
async with all_subtasks_awaited() as task_group:
for task_id, task in enumerate(aio_tasks):
await task_group.spawn(
_send_task_res_to_q,
task_res_q,
task_id, task,
)
for _ in range(aio_tasks_num):
yield await task_res_q.get()
async def worker(port, num_concurrent_streams, num_requests_per_stream,
num_rounds, message_size, load_type):
async with purerpc.insecure_channel("localhost", port) as channel:
stub = GreeterStub(channel)
if load_type == "unary":
load_fn = do_load_unary
elif load_type == "stream":
load_fn = do_load_stream
else:
raise ValueError(f"Unknown load type: {load_type}")
for idx in range(num_rounds):
start = time.time()
task_results = anyio.create_queue(sys.maxsize)
async with anyio.create_task_group() as task_group:
for _ in range(num_concurrent_streams):
await task_group.spawn(load_fn, task_results, stub, num_requests_per_stream, message_size)
end = time.time()
rps = num_concurrent_streams * num_requests_per_stream / (end - start)
latencies = []
for _ in range(num_concurrent_streams):
latencies.append(await task_results.get())
print("Round", idx, "rps", rps, "avg latency", 1000 * sum(latencies) / len(latencies))
def open_channel(capacity=0):
asynclib = anyio.sniffio.current_async_library()
if asynclib == 'trio':
import trio
return trio.open_memory_channel(capacity)
class ClosedResourceError(IOError):
pass
senders = set()
sentinel = object()
send_queue = anyio.create_queue(1)
maxsize = 0 if capacity == math.inf else 1 if capacity == 0 else capacity
receive_queue = anyio.create_queue(maxsize)
class ReceiveChannel:
def __aiter__(self):
return self
async def __anext__(self):
try:
return await self.receive()
except ClosedResourceError:
raise StopAsyncIteration
async def receive(self):
item = sentinel
while item == sentinel:
if not senders and receive_queue.empty():