Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def f():
rlist.append('f')
return 1/0
def g():
rlist.append('bg')
core.schedule()
rlist.append('ag')
def h():
rlist.append('bh')
core.schedule()
rlist.append('ah')
tg = core.tasklet(g)()
tf = core.tasklet(f)()
th = core.tasklet(h)()
try:
core.run()
except ZeroDivisionError:
rlist.append('E')
core.schedule()
core.schedule()
assert rlist == "bg f E bh ag ah".split()
def test_autocatch_taskletexit(self):
# Tests if TaskletExit is caught correctly in core.tasklet.setup().
def f():
core.schedule()
t = core.tasklet(f)()
t.run()
t.kill()
rlist.append('f')
return 1/0
def g():
rlist.append('bg')
core.schedule()
rlist.append('ag')
def h():
rlist.append('bh')
core.schedule()
rlist.append('ah')
tg = core.tasklet(g)()
tf = core.tasklet(f)()
th = core.tasklet(h)()
try:
core.run()
# cheating, can't test for ZeroDivisionError
except ZeroDivisionError:
rlist.append('E')
core.schedule()
core.schedule()
assert rlist == "bg f E bh ag ah".split()
rlist.append('f')
def g():
rlist.append('g')
core.schedule()
def main():
rlist.append('m')
cg = core.tasklet(g)()
cf = core.tasklet(f)()
core.run()
rlist.append('m')
main()
assert core.getcurrent() is core.getmain()
assert rlist == 'm g f m'.split()
def test_init(self):
b = Broker('redis+socket:///path/to/socket')
self.assertFalse(isinstance(b, RabbitMQ))
self.assertTrue(isinstance(b, RedisSocket))
def main():
rlist.append('m')
cg = core.tasklet(g)()
cf = core.tasklet(f)()
core.run()
rlist.append('m')
def test_getruncount(self):
assert core.getruncount() == 1
def with_schedule():
assert core.getruncount() == 2
t1 = core.tasklet(with_schedule)()
assert core.getruncount() == 2
core.schedule()
def with_run():
assert core.getruncount() == 1
t2 = core.tasklet(with_run)()
core.run()
def test_simple_channel(self):
output = []
def print_(*args):
output.append(args)
def Sending(channel):
print_("sending")
channel.send("foo")
def Receiving(channel):
print_("receiving")
print_(channel.receive())
ch=core.channel()
task=core.tasklet(Sending)(ch)
# Note: the argument, schedule is taking is the value,
# schedule returns, not the task that runs next
#core.schedule(task)
core.schedule()
task2=core.tasklet(Receiving)(ch)
#core.schedule(task2)
core.schedule()
core.run()
assert output == [('sending',), ('receiving',), ('foo',)]
def test_sleep2(self):
rlist = []
def f():
sleep()
rlist.append('a')
def f1():
rlist.append('b')
core.tasklet(f)()
core.tasklet(f1)()
core.run()
assert rlist == ['b', 'a']
def test_send_counter(self):
import random
numbers = list(range(20))
random.shuffle(numbers)
def counter(n, ch):
for i in xrange(n):
core.schedule()
ch.send(n)
ch = core.channel()
for each in numbers:
core.tasklet(counter)(each, ch)
core.run()
rlist = []
while ch.balance:
rlist.append(ch.receive())
numbers.sort()
assert rlist == numbers