How to use the aiorwlock.__init__._ContextManagerMixin function in aiorwlock

To help you get started, we’ve selected a few aiorwlock examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aio-libs / aiorwlock / aiorwlock / __init__.py View on Github external
@property
    def locked(self) -> bool:
        return self._lock.read_locked

    async def acquire(self) -> None:
        await self._lock.acquire_read()

    def release(self) -> None:
        self._lock.release_read()

    def __repr__(self) -> str:
        status = 'locked' if self._lock._r_state > 0 else 'unlocked'
        return ''.format(status)


class _WriterLock(_ContextManagerMixin):

    def __init__(self, lock: _RWLockCore):
        self._lock = lock

    @property
    def locked(self) -> bool:
        return self._lock.write_locked

    async def acquire(self) -> None:
        await self._lock.acquire_write()

    def release(self) -> None:
        self._lock.release_write()

    def __repr__(self) -> str:
        status = 'locked' if self._lock._w_state > 0 else 'unlocked'
github aio-libs / aiorwlock / aiorwlock / __init__.py View on Github external
# We have no use for the "as ..."  clause in the with
        # statement for locks.
        return None

    async def __aexit__(self, *args: List[Any]) -> None:
        self.release()

    async def acquire(self) -> None:
        raise NotImplementedError  # pragma: no cover

    def release(self) -> None:
        raise NotImplementedError  # pragma: no cover


# Lock objects to access the _RWLockCore in reader or writer mode
class _ReaderLock(_ContextManagerMixin):

    def __init__(self, lock: _RWLockCore) -> None:
        self._lock = lock

    @property
    def locked(self) -> bool:
        return self._lock.read_locked

    async def acquire(self) -> None:
        await self._lock.acquire_read()

    def release(self) -> None:
        self._lock.release_read()

    def __repr__(self) -> str:
        status = 'locked' if self._lock._r_state > 0 else 'unlocked'

aiorwlock

Read write lock for asyncio.

Apache-2.0
Latest version published 1 month ago

Package Health Score

85 / 100
Full package analysis