Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_increase_tokens(self):
async def setter():
# Wait until waiter() is inside the limiter block
await event1.wait()
async with limiter:
# This can only happen when total_tokens has been increased
await event2.set()
async def waiter():
async with limiter:
await event1.set()
await event2.wait()
limiter = create_capacity_limiter(1)
event1, event2 = create_event(), create_event()
async with create_task_group() as tg:
await tg.spawn(setter)
await tg.spawn(waiter)
await wait_all_tasks_blocked()
assert event1.is_set()
assert not event2.is_set()
await limiter.set_total_tokens(2)
assert event2.is_set()
async def test_limit(self):
async def taskfunc():
nonlocal value
for _ in range(5):
async with limiter:
assert value == 0
value = 1
await wait_all_tasks_blocked()
value = 0
value = 0
limiter = create_capacity_limiter(1)
async with create_task_group() as tg:
for _ in range(3):
await tg.spawn(taskfunc)
async def test_run_in_custom_limiter():
def thread_worker():
nonlocal num_active_threads, max_active_threads
num_active_threads += 1
max_active_threads = max(num_active_threads, max_active_threads)
event.wait(1)
num_active_threads -= 1
async def task_worker():
await run_sync_in_worker_thread(thread_worker, limiter=limiter)
event = threading.Event()
num_active_threads = max_active_threads = 0
limiter = create_capacity_limiter(3)
async with create_task_group() as tg:
for _ in range(4):
await tg.spawn(task_worker)
await sleep(0.1)
assert num_active_threads == 3
assert limiter.borrowed_tokens == 3
event.set()
assert num_active_threads == 0
assert max_active_threads == 3
async def test_borrow(self):
limiter = create_capacity_limiter(2)
assert limiter.total_tokens == 2
assert limiter.available_tokens == 2
assert limiter.borrowed_tokens == 0
async with limiter:
assert limiter.total_tokens == 2
assert limiter.available_tokens == 1
assert limiter.borrowed_tokens == 1
async def test_borrow_twice(self):
limiter = create_capacity_limiter(1)
await limiter.acquire()
with pytest.raises(RuntimeError) as exc:
await limiter.acquire()
exc.match("this borrower is already holding one of this CapacityLimiter's tokens")
async def test_bad_init_type(self):
pytest.raises(TypeError, create_capacity_limiter, 1.0).\
match('total_tokens must be an int or math.inf')
async def test_bad_release(self):
limiter = create_capacity_limiter(1)
with pytest.raises(RuntimeError) as exc:
await limiter.release()
exc.match("this borrower isn't holding any of this CapacityLimiter's tokens")