Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, application, scope):
self.application = guarantee_single_callable(application)
self.scope = scope
self.input_queue = asyncio.Queue()
self.output_queue = asyncio.Queue()
self.future = asyncio.ensure_future(
self.application(scope, self.input_queue.get, self.output_queue.put)
)
if message["type"] == "lifespan.startup":
workers = [
create_task(worker(str(i), queue))
for i in range(
int(getattr(settings, "DJANGO_SIMPLE_TASK_WORKERS", 1))
)
]
await send({"type": "lifespan.startup.complete"})
elif message["type"] == "lifespan.shutdown":
await queue.join()
for w in workers:
w.cancel()
await send({"type": "lifespan.shutdown.complete"})
break
else:
return await guarantee_single_callable(app)(scope, receive, send)