Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Add pulse listener for task results.
if self.pulse:
consumers.append(self.pulse.run())
# Add communitytc pulse listener for test selection results.
if self.community_pulse:
consumers.append(self.community_pulse.run())
# Start the web server in its own process
if self.webserver:
self.webserver.start()
if consumers:
# Run all tasks concurrently
run_tasks(consumers)
else:
# Keep the web server process running
asyncio.get_event_loop().run_forever()
# Make sure any pending task is run.
run_tasks(asyncio.Task.all_tasks())
# Stop the webserver when other async processes are stopped
if self.webserver:
self.webserver.stop()
if self.community_pulse:
consumers.append(self.community_pulse.run())
# Start the web server in its own process
if self.webserver:
self.webserver.start()
if consumers:
# Run all tasks concurrently
run_tasks(consumers)
else:
# Keep the web server process running
asyncio.get_event_loop().run_forever()
# Make sure any pending task is run.
run_tasks(asyncio.Task.all_tasks())
# Stop the webserver when other async processes are stopped
if self.webserver:
self.webserver.stop()