Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_repr(loop):
rwlock = RWLock(loop=loop)
assert 'RWLock' in rwlock.__repr__()
assert 'WriterLock: [unlocked' in rwlock.__repr__()
assert 'ReaderLock: [unlocked' in rwlock.__repr__()
# reader lock __repr__
await rwlock.reader_lock.acquire()
assert 'ReaderLock: [locked]' in rwlock.__repr__()
rwlock.reader_lock.release()
assert 'ReaderLock: [unlocked]' in rwlock.__repr__()
# writer lock __repr__
await rwlock.writer_lock.acquire()
assert 'WriterLock: [locked]' in rwlock.__repr__()
rwlock.writer_lock.release()
assert 'WriterLock: [unlocked]' in rwlock.__repr__()
async def test_get_write_then_read_and_write_again(loop):
rwlock = RWLock(loop=loop)
rl = rwlock.reader
wl = rwlock.writer
f = loop.create_future()
writes = []
async def get_write_lock():
await f
with should_fail(.1, loop):
async with wl:
assert wl.locked
writes.append('should not be here')
ensure_future(get_write_lock(), loop=loop)
async with wl:
async def test_writer_success_with_statement(loop):
# Verify that a writer can get access
rwlock = RWLock(loop=loop)
N = 5
reads = 0
writes = 0
async def r():
# read until we achive write successes
nonlocal reads, writes
while writes < 2:
# print("current pre-reads", reads)
async with rwlock.reader_lock:
reads += 1
# print("current reads", reads)
async def w():
nonlocal reads, writes
while reads == 0:
def test_ctor_noloop_writer(loop):
asyncio.set_event_loop(loop)
rwlock = RWLock().writer_lock
assert rwlock._lock._loop is loop
async def test_read_upgrade_write_release(loop):
rwlock = RWLock(loop=loop)
await rwlock.writer_lock.acquire()
await rwlock.reader_lock.acquire()
await rwlock.reader_lock.acquire()
await rwlock.reader_lock.acquire()
rwlock.reader_lock.release()
assert rwlock.writer_lock.locked
rwlock.writer_lock.release()
assert not rwlock.writer_lock.locked
assert rwlock.reader.locked
with pytest.raises(RuntimeError):
await rwlock.writer_lock.acquire()
def test_ctor_loop_writer(loop):
rwlock = RWLock(loop=loop).writer_lock
assert rwlock._lock._loop is loop
def test_raise_error_on_with_for_reader_lock(loop, fast_track):
rwlock = RWLock(loop=loop, fast=fast_track)
with pytest.raises(RuntimeError):
with rwlock.reader_lock:
pass
async def test_release_unlocked(loop):
rwlock = RWLock(loop=loop)
with pytest.raises(RuntimeError):
rwlock.reader_lock.release()
def should_fail(timeout, loop):
task = current_task(loop)
handle = loop.call_later(timeout, task.cancel)
try:
yield
except asyncio.CancelledError:
handle.cancel()
return
else:
msg = 'Inner task expected to be cancelled: {}'.format(task)
pytest.fail(msg)
async def task():
tid = current_task(loop=self._loop)
self.started.append(tid)
try:
await f()
finally:
self.finished.append(tid)
while not self._can_exit:
await asyncio.sleep(0.01, loop=self._loop)