Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sleep(self):
rlist = []
def f():
sleep(0.2)
rlist.append('a')
def f1():
rlist.append('b')
core.tasklet(f)()
core.tasklet(f1)()
core.run()
assert rlist == ['b', 'a']
def test_readable(self):
(r, w) = os.pipe()
ret = []
def _read(fd):
c = IOChannel(r, mode=0)
c.receive()
ret.append(os.read(fd, 10))
c.stop()
def _write(fd):
os.write(fd, b"TEST")
core.tasklet(_read)(r)
core.tasklet(_write)(w)
core.run()
assert ret == [b"TEST"]
def test_schedule_return_value(self):
def task(val):
value = core.schedule(val)
assert value == val
core.tasklet(task)(10)
core.tasklet(task)(5)
core.run()
def test_schedule_return_value(self):
def task(val):
value = core.schedule(val)
assert value == val
core.tasklet(task)(10)
core.tasklet(task)(5)
core.run()
def f(now, t):
r1.append(from_nanotime(now))
r2 = []
def f1(now, t):
r2.append(from_nanotime(now))
now = time.time()
t = Timer(f, 0.4)
t.start()
t1 = Timer(f1, 0.1)
t1.start()
run()
assert r1[0] > r2[0]
assert (now + 0.39) <= r1[0] <= (now + 0.41), r1[0]
assert (now + 0.09) <= r2[0] <= (now + 0.11), r2[0]
def test_ticker(self):
rlist = []
def f():
ticker = Ticker(0.1)
i = 0
while True:
if i == 3: break
t = ticker.receive()
rlist.append(t)
i += 1
ticker.stop()
tf = core.tasklet(f)()
core.run()
assert len(rlist) == 3
def test_balance_send(self):
def Sending(channel):
channel.send("foo")
ch=core.channel()
task=core.tasklet(Sending)(ch)
core.run()
assert ch.balance == 1