Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_manager_instance(loop, dbi, tmpdir):
files_path = str(tmpdir.mkdir("files"))
watch_path = str(tmpdir.mkdir("watch"))
executor = concurrent.futures.ThreadPoolExecutor()
scheduler = loop.run_until_complete(aiojobs.create_scheduler())
manager = virtool.files.manager.Manager(executor, dbi, files_path, watch_path)
loop.run_until_complete(scheduler.spawn(manager.run()))
yield manager
loop.run_until_complete(scheduler.close())
async def test_nested_application_separate_scheduler(test_client):
app = web.Application()
aiojobs_setup(app)
app2 = web.Application()
aiojobs_setup(app2)
class MyView(web.View):
async def get(self):
assert get_scheduler_from_request(self.request) !=\
get_scheduler_from_app(app)
assert get_scheduler_from_request(self.request) ==\
get_scheduler_from_app(app2)
return web.Response()
app2.router.add_route("*", "/", MyView)
app.add_subapp("/sub/", app2)
client = await test_client(app)
resp = await client.get("/sub/")
assert resp.status == 200
async def get(self):
assert get_scheduler_from_request(self.request) !=\
get_scheduler_from_app(app)
assert get_scheduler_from_request(self.request) ==\
get_scheduler_from_app(app2)
return web.Response()
async def test_atomic_from_view(test_client):
app = web.Application()
class MyView(web.View):
@atomic
async def get(self):
return web.Response()
app.router.add_route("*", "/", MyView)
aiojobs_setup(app)
client = await test_client(app)
resp = await client.get('/')
assert resp.status == 200
scheduler = get_scheduler_from_app(app)
assert scheduler.active_count == 0
assert scheduler.pending_count == 0
async def get(self):
assert get_scheduler_from_request(self.request) ==\
get_scheduler_from_app(app)
return web.Response()
async def handler(request):
with pytest.raises(RuntimeError):
get_scheduler(request)
return web.Response()
async def get(self):
assert get_scheduler_from_request(self.request) !=\
get_scheduler_from_app(app)
assert get_scheduler_from_request(self.request) ==\
get_scheduler_from_app(app2)
return web.Response()
async with sess.get(tag_list_url,
**rqst_args) as resp:
data = json.loads(await resp.read())
if 'tags' in data:
# sometimes there are dangling image names in the hub.
tags.extend(data['tags'])
tag_list_url = None
next_page_link = resp.links.get('next')
if next_page_link:
next_page_url = next_page_link['url']
tag_list_url = (
registry_url
.with_path(next_page_url.path)
.with_query(next_page_url.query)
)
scheduler = await aiojobs.create_scheduler(limit=4)
try:
jobs = await asyncio.gather(*[
scheduler.spawn(_scan_tag(sess, rqst_args, image, tag))
for tag in tags])
await asyncio.gather(*[job.wait() for job in jobs])
finally:
await scheduler.close()
async def _scan_image(sess, image):
rqst_args = await registry_login(
sess, registry_url,
credentials, f'repository:{image}:pull')
tags = []
rqst_args['headers'].update(**base_hdrs)
async with sess.get(registry_url / f'v2/{image}/tags/list',
**rqst_args) as resp:
data = json.loads(await resp.read())
if 'tags' in data:
# sometimes there are dangling image names in the hub.
tags.extend(data['tags'])
scheduler = await aiojobs.create_scheduler(limit=8)
try:
jobs = await asyncio.gather(*[
scheduler.spawn(_scan_tag(sess, rqst_args, image, tag))
for tag in tags])
await asyncio.gather(*[job.wait() for job in jobs])
finally:
await scheduler.close()
def _init_subapp(pkg_name: str,
root_app: web.Application,
subapp: web.Application,
global_middlewares: Iterable[WebMiddleware]) -> None:
subapp.on_response_prepare.append(on_prepare)
async def _copy_public_interface_objs(subapp: web.Application):
# Allow subapp's access to the root app properties.
# These are the public APIs exposed to plugins as well.
for key, obj in public_interface_objs.items():
subapp[key] = obj
# We must copy the public interface prior to all user-defined startup signal handlers.
subapp.on_startup.insert(0, _copy_public_interface_objs)
prefix = subapp.get('prefix', pkg_name.split('.')[-1].replace('_', '-'))
aiojobs.aiohttp.setup(subapp, **root_app['scheduler_opts'])
root_app.add_subapp('/' + prefix, subapp)
root_app.middlewares.extend(global_middlewares)