Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_pool_worker(self):
tx = context.Queue()
rx = context.Queue()
worker = PoolWorker(tx, rx, 1)
worker.start()
self.assertTrue(worker.is_alive())
tx.put_nowait((1, mapper, (5,), {}))
await asyncio.sleep(0.5)
result = rx.get_nowait()
self.assertEqual(result, (1, 10))
self.assertFalse(worker.is_alive()) # maxtasks == 1
async def test_pool_worker(self):
tx = context.Queue()
rx = context.Queue()
worker = PoolWorker(tx, rx, 1)
worker.start()
self.assertTrue(worker.is_alive())
tx.put_nowait((1, mapper, (5,), {}))
await asyncio.sleep(0.5)
result = rx.get_nowait()
self.assertEqual(result, (1, 10))
self.assertFalse(worker.is_alive()) # maxtasks == 1
async def test_pool_worker(self):
tx = context.Queue()
rx = context.Queue()
worker = PoolWorker(tx, rx, 1)
worker.start()
self.assertTrue(worker.is_alive())
tx.put_nowait((1, mapper, (5,), {}))
await asyncio.sleep(0.5)
result = rx.get_nowait()
self.assertEqual(result, (1, 10))
self.assertFalse(worker.is_alive()) # maxtasks == 1
async def test_worker_join(self):
# test results from join
p = amp.Worker(target=sleepypid)
p.start()
self.assertEqual(await p.join(), p.pid)
# test awaiting p directly, no need to start
p = amp.Worker(target=sleepypid)
self.assertEqual(await p, p.pid)
async def test_spawn_context(self):
with self.assertRaises(ValueError):
amp.set_context("foo")
async def inline(x):
return x
amp.set_context("spawn")
with self.assertRaises(AttributeError):
p = amp.Worker(target=inline, args=(1,), name="test_inline")
p.start()
await p.join()
p = amp.Worker(target=two, name="test_global")
p.start()
await p.join()
values = list(range(10))
results = [await mapper(i) for i in values]
async with amp.Pool(2) as pool:
self.assertEqual(await pool.map(mapper, values), results)
self.assertEqual(p.result, 2)
async def test_worker_join(self):
# test results from join
p = amp.Worker(target=sleepypid)
p.start()
self.assertEqual(await p.join(), p.pid)
# test awaiting p directly, no need to start
p = amp.Worker(target=sleepypid)
self.assertEqual(await p, p.pid)
async def test_worker(self):
p = amp.Worker(target=sleepypid)
p.start()
await p.join()
self.assertFalse(p.is_alive())
self.assertEqual(p.result, p.pid)
async def test_spawn_context(self):
with self.assertRaises(ValueError):
amp.set_context("foo")
async def inline(x):
return x
amp.set_context("spawn")
with self.assertRaises(AttributeError):
p = amp.Worker(target=inline, args=(1,), name="test_inline")
p.start()
await p.join()
p = amp.Worker(target=two, name="test_global")
p.start()
await p.join()
values = list(range(10))
results = [await mapper(i) for i in values]
async with amp.Pool(2) as pool:
self.assertEqual(await pool.map(mapper, values), results)
self.assertEqual(p.result, 2)
return x
amp.set_context("spawn")
with self.assertRaises(AttributeError):
p = amp.Worker(target=inline, args=(1,), name="test_inline")
p.start()
await p.join()
p = amp.Worker(target=two, name="test_global")
p.start()
await p.join()
values = list(range(10))
results = [await mapper(i) for i in values]
async with amp.Pool(2) as pool:
self.assertEqual(await pool.map(mapper, values), results)
self.assertEqual(p.result, 2)
async def test_pool_concurrency(self):
results = []
for sleep, tasks, processes, concurrency in PERF_SETS:
with Timer() as timer:
async with amp.Pool(processes, childconcurrency=concurrency) as pool:
await pool.map(sleepy, (sleep for _ in range(tasks)))
results.append((sleep, tasks, processes, concurrency, timer.result))
print()
for result in results:
print(*result)