Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@gen.coroutine
def delete(self, id_):
yield self.sleep()
return super().delete(id_)
@gen.coroutine
def tearDown(self):
raise gen.Return(None)
@gen.coroutine
def process_emitBatch(self, seqid, iprot, oprot):
args = emitBatch_args()
args.read(iprot)
iprot.readMessageEnd()
yield gen.maybe_future(self._handler.emitBatch(args.batch))
@gen.coroutine
def _server_request_loop(self, delegate):
try:
while True:
conn = HTTP1Connection(self.stream, False,
self.params, self.context)
request_delegate = delegate.start_request(self, conn)
try:
ret = yield conn.read_response(request_delegate)
except (iostream.StreamClosedError,
iostream.UnsatisfiableReadError):
return
except _QuietException:
# This exception was already logged.
conn.close()
return
except Exception:
@gen.coroutine
def start(self):
"""Override KubeSpawner class start method."""
yield self._prepare_volumes()
_start = yield super(KubeFormSpawner, self).start()
return _start
@gen.coroutine
def post(self):
udid = self.get_argument("udid")
url = self.get_argument("url")
device = idevices[udid]
ret = yield self.app_install(device.udid, url)
if not ret['success']:
self.set_status(ret.get("status", 400)) # default bad request
self.write(ret)
@gen.coroutine
def put(self, *args, **kwargs):
try:
yield self._put(*args, **kwargs)
except Exception as e:
exception_callback(e)
self.render_error(str(e))
@gen.coroutine
def convert_to_grayscale(cls,
sender,
details,
update_image=False,
with_alpha=False):
img = yield Events.trigger(
Events.Engine.convert_to_grayscale,
sender,
details=details,
update_image=update_image,
with_alpha=with_alpha,
)
return img
def _rebuild_tornado_coroutine(func):
from tornado import gen
return gen.coroutine(func)
@gen.coroutine
def get(self):
"""GET query returns info on all installed extensions"""
if self.get_argument('refresh', False) == '1':
yield self.manager.refresh_outdated()
extensions = yield self.manager.list_extensions()
self.finish(json.dumps(extensions))