Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main():
rlist.append('m')
cg = core.tasklet(g)()
cf = core.tasklet(f)()
core.run()
rlist.append('m')
def test_simple(self):
r_list = []
def f():
r_list.append(True)
pid = spawn(f)
assert isinstance(pid, ActorRef)
assert pid.ref == 0
assert hasattr(pid.actor, 'mailbox')
sleep(0.1)
core.run()
assert r_list == [True]
assert pid.actor is None
assert pid.is_alive is False
def test_kill(self):
def f():pass
t = core.tasklet(f)()
t.kill()
assert not t.alive
def f():
start = time.time()
c.send(True)
r_list.append(start)
core.tasklet(f)()
unblocked_recv = []
for i in range(11):
time.sleep(0.01)
unblocked_recv.append(c.receive())
core.schedule()
core.run()
diff = time.time() - r_list[0]
assert len(unblocked_recv) == 11
assert diff > 0.1
def test_ticker(self):
rlist = []
def f():
ticker = Ticker(0.1)
i = 0
while True:
if i == 3: break
t = ticker.receive()
rlist.append(t)
i += 1
ticker.stop()
tf = core.tasklet(f)()
core.run()
assert len(rlist) == 3
def test_readable(self):
(r, w) = os.pipe()
ret = []
def _read(fd):
c = IOChannel(r, mode=0)
c.receive()
ret.append(os.read(fd, 10))
c.stop()
def _write(fd):
os.write(fd, b"TEST")
core.tasklet(_read)(r)
core.tasklet(_write)(w)
core.run()
assert ret == [b"TEST"]
def main():
rlist.append('m')
cg = core.tasklet(g)()
cf = core.tasklet(f)()
core.run()
rlist.append('m')
r_list = []
def f():
r_list.append(True)
t = core.tasklet(f)()
assert not hasattr(t, 'mailbox')
wrap(t)
assert isinstance(t, Actor)
assert hasattr(t, 'mailbox')
assert hasattr(t, 'ref')
pid = t.ref
assert isinstance(pid, ActorRef)
assert pid.ref == 1
core.run()
assert r_list == [True]
assert pid.actor is None
assert pid.is_alive is False
# -*- coding: utf-8 -
#
# This file is part of flower. See the NOTICE for more information
time = __import__('time').time
from flower import core
from flower.core import timer
from flower.core.util import from_nanotime
class Ticker(core.channel):
"""A Ticker holds a synchronous channel that delivers `ticks' of a
clock at intervals."""
def __init__(self, interval, label=''):
super(Ticker, self).__init__(label=label)
self._interval = interval
self._timer = timer.Timer(self._tick, interval, interval)
self._timer.start()
def _tick(self, now, h):
self.send(from_nanotime(now))
def stop(self):
self._timer.stop()
def idle():