Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rlist.append('f')
def g():
rlist.append('g')
core.schedule()
def main():
rlist.append('m')
cg = core.tasklet(g)()
cf = core.tasklet(f)()
core.run()
rlist.append('m')
main()
assert core.getcurrent() is core.getmain()
assert rlist == 'm g f m'.split()
def self():
return core.getcurrent()
def __init__(self, addr=('0.0.0.0', 0)):
# listeners are all couroutines waiting for a connections
self.listeners = deque()
self.uv = uv_server()
self.sched = get_scheduler()
self.task = getcurrent()
self.listening = False
self.handler = self.HANDLER_CLASS(self.uv.loop)
self.handler.bind(addr)
def flush():
curr = maybe_wrap(core.getcurrent())
curr.flush()
def idle():
""" By using this function the current tasklet will be scheduled asap"""
sched = core.get_scheduler()
curr = core.getcurrent()
def ready(now, h):
curr.blocked = False
sched.append(curr)
core.schedule()
t = timer.Timer(ready, 0.0001)
t.start()
curr.blocked = True
core.schedule_remove()
def __init__(self):
self.task = getcurrent()
self.uv = uv_server()
self.sched = get_scheduler()
self.c = channel()
def spawn_link(cls, func, *args, **kwargs):
curr = core.getcurrent()
if not hasattr(curr, 'mailbox'):
curr = cls.wrap(curr)
if operator.indexOf(self.links, curr.ref) < 0:
self.links.append(curr.ref)
return cls.spawn(func, *args, **kwargs)
if family == socket.AF_INET and nodelay:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.bind(addr)
sock.setblocking(0)
fd = sock.fileno()
self.sock = sock
self.fd = fd
self.addr = addr
self.backlog = kwargs.get('backlog', 128)
self.timeout = socket.getdefaulttimeout()
self.uv = uv_server()
self.poller = None
self.listeners = deque()
self.task = getcurrent()
# start to listen
self.sock.listen(self.backlog)
def send(dest, msg):
""" send a message to the destination """
source = maybe_wrap(core.getcurrent())
if isinstance(dest, six.string_types):
dest = registry[dest]
elif isinstance(dest, core.tasklet):
dest = maybe_wrap(dest)
mail = Message(source, dest, msg)
mail.send()