Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_parallel_writer(temp_file, uuid):
w_file = thread_aio_file(temp_file, 'w')
r_file = thread_aio_file(temp_file, 'r')
futures = list()
for i in range(2000):
futures.append(w_file.write(uuid, i * len(uuid), 0))
await asyncio.wait(futures)
await w_file.fsync()
count = 0
for async_chunk in Reader(r_file, chunk_size=len(uuid)):
chunk = await async_chunk
if not chunk:
break
assert chunk.decode() == uuid
count += 1
assert count == 2000
def test_parallel_writer(temp_file, uuid):
w_file = thread_aio_file(temp_file, 'w')
r_file = thread_aio_file(temp_file, 'r')
futures = list()
for i in range(2000):
futures.append(w_file.write(uuid, i * len(uuid), 0))
yield from asyncio.wait(futures)
yield from w_file.fsync()
count = 0
for async_chunk in Reader(r_file, chunk_size=len(uuid)):
chunk = yield from async_chunk
if not chunk:
break
assert chunk.decode() == uuid
count += 1
assert count == 2000
def test_reader_writer(temp_file, uuid):
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
yield from writer(uuid)
yield from w_file.fsync()
count = 0
for async_chunk in Reader(r_file, chunk_size=len(uuid)):
chunk = yield from async_chunk
if not chunk:
break
assert chunk.decode() == uuid
count += 1
assert count == 100
async def test_parallel_writer(aio_file_maker, temp_file, uuid):
w_file = await aio_file_maker(temp_file, 'w')
r_file = await aio_file_maker(temp_file, 'r')
futures = list()
for i in range(1000):
futures.append(w_file.write(uuid, i * len(uuid)))
shuffle(futures)
await asyncio.gather(*futures)
await w_file.fsync()
count = 0
async for chunk in Reader(r_file, chunk_size=len(uuid)):
assert chunk == uuid
count += 1
assert count == 1000
async def test_reader_writer(loop, temp_file, uuid):
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
await writer(uuid)
await w_file.fsync()
async for chunk in Reader(r_file, chunk_size=len(uuid)):
assert chunk.decode() == uuid
async def test_reader_writer(aio_file_maker, temp_file, uuid):
r_file = await aio_file_maker(temp_file, 'r')
w_file = await aio_file_maker(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
await writer(uuid)
await w_file.fsync()
count = 0
for async_chunk in Reader(r_file, chunk_size=len(uuid)):
chunk = await async_chunk
assert chunk == uuid
count += 1
assert count == 100
async def test_parallel_writer(aio_file_maker, temp_file, uuid):
w_file = await aio_file_maker(temp_file, 'w')
r_file = await aio_file_maker(temp_file, 'r')
futures = list()
for i in range(1000):
futures.append(w_file.write(uuid, i * len(uuid)))
shuffle(futures)
await asyncio.gather(*futures)
await w_file.fsync()
count = 0
async for chunk in Reader(r_file, chunk_size=len(uuid)):
assert chunk == uuid
count += 1
assert count == 1000
def __init__(self, aio_file: AIOFile, offset: int = 0,
chunk_size: int = 255, line_sep='\n'):
self.__reader = Reader(aio_file, chunk_size=chunk_size, offset=offset)
self._buffer = io.BytesIO() if aio_file.mode.binary else io.StringIO()
self.linesep = (
line_sep.encode() if self.__reader.file.mode.binary else line_sep
)