Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def switch(self):
if not self.running:
self._runtask = tasklet(self.run)()
getcurrent().remove()
self._runtask.switch()
def _channel_action(self, arg, d):
"""
d == -1 : receive
d == 1 : send
the original CStackless has an argument 'stackl' which is not used
here.
'target' is the peer tasklet to the current one
"""
assert abs(d) == 1
do_schedule = False
curr = getcurrent()
source = ChannelWaiter(curr, get_scheduler(), arg)
if d > 0:
if not self.capacity:
cando = self.balance < 0
else:
cando = len(self.recvq) <= self.capacity
dir = d
else:
if not self.capacity:
cando = self.balance > 0
else:
cando = len(self.sendq) <= self.capacity
dir = 0
if _channel_callback is not None:
if d > 0:
if not self.capacity:
cando = self.balance < 0
else:
cando = len(self.recvq) <= self.capacity
dir = d
else:
if not self.capacity:
cando = self.balance > 0
else:
cando = len(self.sendq) <= self.capacity
dir = 0
if _channel_callback is not None:
with self._lock:
_channel_callback(self, getcurrent(), dir, not cando)
if cando:
# there is somebody waiting
try:
target = self.dequeue(d)
except IndexError:
# capacity is not None but nobody is waiting
if d > 0:
self.enqueue(dir, ChannelWaiter(None, None, arg))
return None
source.arg, target.arg = target.arg, source.arg
if target.task is not None:
if self.schedule_all:
target.scheduler.unblock(target.task)
do_schedule = True
def idle(self, handle):
if getcurrent() is self._runtask:
schedule()