How to use aiorwlock - 10 common examples

To help you get started, we’ve selected a few aiorwlock examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_repr(loop):
    rwlock = RWLock(loop=loop)
    assert 'RWLock' in rwlock.__repr__()
    assert 'WriterLock: [unlocked' in rwlock.__repr__()
    assert 'ReaderLock: [unlocked' in rwlock.__repr__()

    # reader lock __repr__
    await rwlock.reader_lock.acquire()
    assert 'ReaderLock: [locked]' in rwlock.__repr__()
    rwlock.reader_lock.release()
    assert 'ReaderLock: [unlocked]' in rwlock.__repr__()

    # writer lock __repr__
    await rwlock.writer_lock.acquire()
    assert 'WriterLock: [locked]' in rwlock.__repr__()
    rwlock.writer_lock.release()
    assert 'WriterLock: [unlocked]' in rwlock.__repr__()
github aio-libs / aiorwlock / tests / test_corner_cases.py View on Github external
async def test_get_write_then_read_and_write_again(loop):
    rwlock = RWLock(loop=loop)
    rl = rwlock.reader
    wl = rwlock.writer

    f = loop.create_future()
    writes = []

    async def get_write_lock():
        await f
        with should_fail(.1, loop):
            async with wl:
                assert wl.locked
                writes.append('should not be here')

    ensure_future(get_write_lock(), loop=loop)

    async with wl:
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_writer_success_with_statement(loop):
    # Verify that a writer can get access
    rwlock = RWLock(loop=loop)
    N = 5
    reads = 0
    writes = 0

    async def r():
        # read until we achive write successes
        nonlocal reads, writes
        while writes < 2:
            # print("current pre-reads", reads)
            async with rwlock.reader_lock:
                reads += 1
                # print("current reads", reads)

    async def w():
        nonlocal reads, writes
        while reads == 0:
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_ctor_noloop_writer(loop):
    asyncio.set_event_loop(loop)
    rwlock = RWLock().writer_lock
    assert rwlock._lock._loop is loop
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_read_upgrade_write_release(loop):
    rwlock = RWLock(loop=loop)
    await rwlock.writer_lock.acquire()
    await rwlock.reader_lock.acquire()
    await rwlock.reader_lock.acquire()

    await rwlock.reader_lock.acquire()
    rwlock.reader_lock.release()

    assert rwlock.writer_lock.locked

    rwlock.writer_lock.release()
    assert not rwlock.writer_lock.locked

    assert rwlock.reader.locked

    with pytest.raises(RuntimeError):
        await rwlock.writer_lock.acquire()
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_ctor_loop_writer(loop):
    rwlock = RWLock(loop=loop).writer_lock
    assert rwlock._lock._loop is loop
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_raise_error_on_with_for_reader_lock(loop, fast_track):
    rwlock = RWLock(loop=loop, fast=fast_track)
    with pytest.raises(RuntimeError):
        with rwlock.reader_lock:
            pass
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_release_unlocked(loop):
    rwlock = RWLock(loop=loop)
    with pytest.raises(RuntimeError):
        rwlock.reader_lock.release()
github aio-libs / aiorwlock / tests / test_corner_cases.py View on Github external
def should_fail(timeout, loop):
    task = current_task(loop)

    handle = loop.call_later(timeout, task.cancel)
    try:
        yield
    except asyncio.CancelledError:
        handle.cancel()
        return
    else:
        msg = 'Inner task expected to be cancelled: {}'.format(task)
        pytest.fail(msg)
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def task():
            tid = current_task(loop=self._loop)
            self.started.append(tid)
            try:
                await f()
            finally:
                self.finished.append(tid)
                while not self._can_exit:
                    await asyncio.sleep(0.01, loop=self._loop)

aiorwlock

Read write lock for asyncio.

Apache-2.0
Latest version published 1 month ago

Package Health Score

88 / 100
Full package analysis