How to use the aiorwlock.RWLock function in aiorwlock

To help you get started, we’ve selected a few aiorwlock examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_repr(loop):
    rwlock = RWLock(loop=loop)
    assert 'RWLock' in rwlock.__repr__()
    assert 'WriterLock: [unlocked' in rwlock.__repr__()
    assert 'ReaderLock: [unlocked' in rwlock.__repr__()

    # reader lock __repr__
    await rwlock.reader_lock.acquire()
    assert 'ReaderLock: [locked]' in rwlock.__repr__()
    rwlock.reader_lock.release()
    assert 'ReaderLock: [unlocked]' in rwlock.__repr__()

    # writer lock __repr__
    await rwlock.writer_lock.acquire()
    assert 'WriterLock: [locked]' in rwlock.__repr__()
    rwlock.writer_lock.release()
    assert 'WriterLock: [unlocked]' in rwlock.__repr__()
github aio-libs / aiorwlock / tests / test_corner_cases.py View on Github external
async def test_get_write_then_read_and_write_again(loop):
    rwlock = RWLock(loop=loop)
    rl = rwlock.reader
    wl = rwlock.writer

    f = loop.create_future()
    writes = []

    async def get_write_lock():
        await f
        with should_fail(.1, loop):
            async with wl:
                assert wl.locked
                writes.append('should not be here')

    ensure_future(get_write_lock(), loop=loop)

    async with wl:
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_writer_success_with_statement(loop):
    # Verify that a writer can get access
    rwlock = RWLock(loop=loop)
    N = 5
    reads = 0
    writes = 0

    async def r():
        # read until we achive write successes
        nonlocal reads, writes
        while writes < 2:
            # print("current pre-reads", reads)
            async with rwlock.reader_lock:
                reads += 1
                # print("current reads", reads)

    async def w():
        nonlocal reads, writes
        while reads == 0:
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_ctor_noloop_writer(loop):
    asyncio.set_event_loop(loop)
    rwlock = RWLock().writer_lock
    assert rwlock._lock._loop is loop
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_read_upgrade_write_release(loop):
    rwlock = RWLock(loop=loop)
    await rwlock.writer_lock.acquire()
    await rwlock.reader_lock.acquire()
    await rwlock.reader_lock.acquire()

    await rwlock.reader_lock.acquire()
    rwlock.reader_lock.release()

    assert rwlock.writer_lock.locked

    rwlock.writer_lock.release()
    assert not rwlock.writer_lock.locked

    assert rwlock.reader.locked

    with pytest.raises(RuntimeError):
        await rwlock.writer_lock.acquire()
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_ctor_loop_writer(loop):
    rwlock = RWLock(loop=loop).writer_lock
    assert rwlock._lock._loop is loop
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
def test_raise_error_on_with_for_reader_lock(loop, fast_track):
    rwlock = RWLock(loop=loop, fast=fast_track)
    with pytest.raises(RuntimeError):
        with rwlock.reader_lock:
            pass
github aio-libs / aiorwlock / tests / test_rwlock.py View on Github external
async def test_release_unlocked(loop):
    rwlock = RWLock(loop=loop)
    with pytest.raises(RuntimeError):
        rwlock.reader_lock.release()
github freenas / freenas / src / middlewared / middlewared / plugins / cloud_sync.py View on Github external
return dataset, any(
        (ds["mountpoint"] + "/").startswith(directory + "/")
        for ds in datasets
        if ds != dataset
    )


class _FsLockCore(aiorwlock._RWLockCore):
    def _release(self, lock_type):
        if self._r_state == 0 and self._w_state == 0:
            self._fs_manager._remove_lock(self._fs_path)

        return super()._release(lock_type)


class _FsLock(aiorwlock.RWLock):
    core = _FsLockCore


class FsLockDirection(enum.Enum):
    READ = 0
    WRITE = 1


class FsLockManager:
    _lock = _FsLock

    def __init__(self):
        self.locks = {}

    def lock(self, path, direction):
        path = os.path.normpath(path)
github aio-libs / aiorwlock / examples / simple.py View on Github external
async def go():
    rwlock = aiorwlock.RWLock()

    # acquire reader lock
    await rwlock.reader_lock.acquire()
    try:
        print('inside reader lock')

        await asyncio.sleep(0.1)
    finally:
        rwlock.reader_lock.release()

    # acquire writer lock
    await rwlock.writer_lock.acquire()
    try:
        print('inside writer lock')

        await asyncio.sleep(0.1)

aiorwlock

Read write lock for asyncio.

Apache-2.0
Latest version published 10 months ago

Package Health Score

80 / 100
Full package analysis

Similar packages