Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def g():
rlist.append('gb')
core.schedule()
rlist.append('ga')
def test_local2(self):
d = local()
d.a = 1
r_list = []
def f():
try:
d.a
except AttributeError:
r_list.append(True)
d.a = 2
if d.a == 2:
r_list.append(True)
core.tasklet(f)()
core.schedule()
assert r_list == [True, True]
assert d.a == 1
def task(val):
value = core.schedule(val)
assert value == val
def test_schedule_return(self):
def f():pass
t1= core.tasklet(f)()
r = core.schedule()
assert r is core.getmain()
t2 = core.tasklet(f)()
r = core.schedule('test')
assert r == 'test'
def test_local(self):
d = local()
d.a = 1
r_list = []
def f():
try:
d.a
except AttributeError:
r_list.append(True)
core.tasklet(f)()
core.schedule()
assert r_list == [True]
def h():
rlist.append('hb')
core.schedule()
rlist.append('ha')
def Loop(i):
for x in range(3):
core.schedule()
print_("schedule", i)
def ready(now, h):
curr.blocked = False
sched.append(curr)
core.schedule()
def _on_connection(self, client, addr):
if len(self.listeners):
listener = self.listeners.popleft()
self.uv.wakeup()
# return a new connection object to the listener
conn = SockConn(client, self.addr, addr)
listener.c.send((conn, None))
schedule()
else:
# we should probably do something there to drop connections
self.task.throw(NoMoreListener)