Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_line_reader(aio_file_maker, temp_file, uuid):
afp = await aio_file_maker(temp_file, 'w+')
writer = Writer(afp)
lines = [uuid4().hex for _ in range(1000)]
for line in lines:
await writer(line)
await writer('\n')
read_lines = []
async for line in LineReader(afp):
read_lines.append(line[:-1])
assert lines == read_lines
def test_reader_writer(temp_file, uuid):
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
yield from writer(uuid)
yield from w_file.fsync()
count = 0
for async_chunk in Reader(r_file, chunk_size=len(uuid)):
chunk = yield from async_chunk
if not chunk:
break
assert chunk.decode() == uuid
count += 1
async def test_reader_writer(loop, temp_file, uuid):
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
await writer(uuid)
await w_file.fsync()
async for chunk in Reader(r_file, chunk_size=len(uuid)):
assert chunk.decode() == uuid
async def test_reader_writer(aio_file_maker, temp_file, uuid):
r_file = await aio_file_maker(temp_file, 'r')
w_file = await aio_file_maker(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
await writer(uuid)
await w_file.fsync()
async for chunk in Reader(r_file, chunk_size=len(uuid)):
assert chunk == uuid
async def test_reader_writer(aio_file_maker, temp_file, uuid):
r_file = await aio_file_maker(temp_file, 'r')
w_file = await aio_file_maker(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
await writer(uuid)
await w_file.fsync()
count = 0
for async_chunk in Reader(r_file, chunk_size=len(uuid)):
chunk = await async_chunk
assert chunk == uuid
count += 1
assert count == 100
async def test_reader_writer(temp_file, uuid):
r_file = thread_aio_file(temp_file, 'r')
w_file = thread_aio_file(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
await writer(uuid)
await w_file.fsync()
count = 0
for async_chunk in Reader(r_file, chunk_size=len(uuid)):
chunk = await async_chunk
assert chunk.decode() == uuid
count += 1
assert count == 100
async def test_line_reader_one_line(aio_file_maker, temp_file):
afp = await aio_file_maker(temp_file, 'w+')
writer = Writer(afp)
payload = " ".join(uuid4().hex for _ in range(1000))
await writer(payload)
read_lines = []
async for line in LineReader(afp):
read_lines.append(line)
assert payload == read_lines[0]
async def test_reader_writer2(aio_file_maker, temp_file, uuid):
r_file = await aio_file_maker(temp_file, 'r')
w_file = await aio_file_maker(temp_file, 'w')
writer = Writer(w_file)
for _ in range(100):
await writer(uuid)
await w_file.fsync()
async for chunk in Reader(r_file, chunk_size=len(uuid)):
assert chunk == uuid