How to use the aiolimiter.AsyncLimiter function in aiolimiter

To help you get started, we’ve selected a few aiolimiter examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mjpieters / aiolimiter / tests / test_aiolimiter.py View on Github external
async def test_acquire(event_loop, task):
    current_time = 0

    def mocked_time():
        return current_time

    # capacity released every 2 seconds
    limiter = AsyncLimiter(5, 10)

    with mock.patch.object(event_loop, "time", mocked_time):
        tasks = [asyncio.ensure_future(task(limiter)) for _ in range(10)]

        pending = await wait_for_n_done(tasks, 5)
        assert len(pending) == 5

        current_time = 3  # releases capacity for one and some buffer
        assert limiter.has_capacity()

        pending = await wait_for_n_done(pending, 1)
        assert len(pending) == 4

        current_time = 7  # releases capacity for two more, plus buffer
        pending = await wait_for_n_done(pending, 2)
        assert len(pending) == 2
github mjpieters / aiolimiter / tests / test_aiolimiter.py View on Github external
def test_attributes():
    limiter = AsyncLimiter(42, 81)
    assert limiter.max_rate == 42
    assert limiter.time_period == 81
github mjpieters / aiolimiter / tests / test_aiolimiter.py View on Github external
async def test_has_capacity():
    limiter = AsyncLimiter(1)
    assert limiter.has_capacity()
    assert not limiter.has_capacity(42)

    await limiter.acquire()
    assert not limiter.has_capacity()
github mjpieters / aiolimiter / tests / test_aiolimiter.py View on Github external
async def test_over_acquire():
    limiter = AsyncLimiter(1)
    with pytest.raises(ValueError):
        await limiter.acquire(42)

aiolimiter

asyncio rate limiter, a leaky bucket implementation

MIT
Latest version published 17 days ago

Package Health Score

79 / 100
Full package analysis