Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_multiple_timer():
r1 = []
def f(now, t):
r1.append(from_nanotime(now))
r2 = []
def f1(now, t):
r2.append(from_nanotime(now))
now = time.time()
t = Timer(f, 0.4)
t.start()
t1 = Timer(f1, 0.1)
t1.start()
run()
assert r1[0] > r2[0]
assert (now + 0.39) <= r1[0] <= (now + 0.41), r1[0]
assert (now + 0.09) <= r2[0] <= (now + 0.11), r2[0]
def test_repeat():
r = []
def f(now, t):
if len(r) == 3:
t.stop()
return
r.append(now)
t = Timer(f, 0.01, 0.01)
t.start()
run()
assert len(r) == 3
assert r[2] > r[1]
assert r[1] > r[0]
def test_multiple_timer():
r1 = []
def f(now, t):
r1.append(from_nanotime(now))
r2 = []
def f1(now, t):
r2.append(from_nanotime(now))
now = time.time()
t = Timer(f, 0.4)
t.start()
t1 = Timer(f1, 0.1)
t1.start()
run()
assert r1[0] > r2[0]
assert (now + 0.39) <= r1[0] <= (now + 0.41), r1[0]
assert (now + 0.09) <= r2[0] <= (now + 0.11), r2[0]
def test_simple_timer():
r_list = []
def _func(now, t):
r_list.append(from_nanotime(now))
now = time.time()
t = Timer(_func, 0.1)
t.start()
run()
delay = r_list[0]
assert (now + 0.09) <= delay <= (now + 0.11), delay
def idle():
""" By using this function the current tasklet will be scheduled asap"""
sched = core.get_scheduler()
curr = core.getcurrent()
def ready(now, h):
curr.blocked = False
sched.append(curr)
core.schedule()
t = timer.Timer(ready, 0.0001)
t.start()
curr.blocked = True
core.schedule_remove()
def after_func(d, f, *args, **kwargs):
""" AfterFunc waits for the duration to elapse and then calls f in
its own coroutine. It returns a Timer that can be used to cancel the
call using its stop method. """
def _func(now, handle):
core.tasklet(f)(*args, **kwargs)
core.schedule()
t = timer.Timer(_func, d)
t.start()
return t
def __init__(self, interval, label=''):
super(Ticker, self).__init__(label=label)
self._interval = interval
self._timer = timer.Timer(self._tick, interval, interval)
self._timer.start()
def sleep(seconds=0):
if not seconds:
return
sched = get_scheduler()
curr = getcurrent()
c = channel()
def ready(now, t):
c.send(None)
t = Timer(ready, seconds)
t.start()
c.receive()