Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wakeselect ():
for fd, obj in list(asyncore.socket_map.items()):
if hasattr(obj, "pull_trigger"):
obj.pull_trigger()
map [client._fileno] = client
fds = list (map.keys ())
# maybe 2 is enough
safeguard = count * 2
while self.has_job () and safeguard:
safeguard -= 1
asyncore.loop (0.1, map, count = 1)
if safeguard % 5 == 0:
self.maintern (time.time ())
self.maintern (time.time ())
for fd in fds:
if fd not in map:
# resync
try: del asyncore.socket_map [fd]
except KeyError: pass
def mapsize ():
return len (asyncore.socket_map)
_fetch_started = timeit.default_timer ()
# IMP. mannually set
lifetime._polling = 1
# create initail workers
#_logger ("creating connection pool", "info")
target_socks = min (_workers, qsize ())
for i in range (target_socks):
_req ()
select_timeout = 1.0
if not _force_h1 and http2.MAX_HTTP2_CONCURRENT_STREAMS > 1:
# wait all availabale
while qsize ():
lifetime.lifetime_loop (select_timeout, 1)
target_socks = sum ([1 for conn in asyncore.socket_map.values () if hasattr (conn, "get_proto") and not isinstance (conn, (dns.UDPClient, dns.TCPClient)) and conn.get_proto () in H2_PROTOCOLS and conn.connected and not conn.isactive ()])
if target_socks == _workers:
#_logger ('%d connection(s) created' % target_socks, 'info')
break
# now starting
if http2.MAX_HTTP2_CONCURRENT_STREAMS == 1:
measurement = min
else:
measurement = max
while qsize () or _currents:
lifetime.lifetime_loop (select_timeout, 1)
while _concurrent > measurement (len (_currents), mapsize ()) and qsize ():
_req ()
_max_conns = max (_max_conns, mapsize ())
#print ('--', len (_currents), mapsize (), qsize ())
def keys (self):
with self.lock:
v = dict.keys (self)
return v
def values (self):
with self.lock:
v = dict.values (self)
return v
if not hasattr (asyncore, "_socket_map"):
asyncore._socket_map = asyncore.socket_map
del asyncore.socket_map
asyncore.socket_map = thread_safe_socket_map ()
def graceful_shutdown_loop ():
global _shutdown_phase
timestamp = time.time()
timeout = 1.0
map = asyncore.socket_map
while map and _shutdown_phase < 4:
time_in_this_phase = time.time() - timestamp
veto = 0
for fd,obj in list(map.items()):
try:
fn = getattr (obj,'clean_shutdown_control')
except AttributeError:
pass
else:
try:
veto = veto or fn (_shutdown_phase, time_in_this_phase)
except:
obj.handle_error()
if veto and time_in_this_phase < _shutdown_timeout:
poll_fun_wrap (timeout, map)
v = dict.items (self)
return v
def keys (self):
with self.lock:
v = dict.keys (self)
return v
def values (self):
with self.lock:
v = dict.values (self)
return v
if not hasattr (asyncore, "_socket_map"):
asyncore._socket_map = asyncore.socket_map
del asyncore.socket_map
asyncore.socket_map = thread_safe_socket_map ()
def lifetime_loop (timeout = 30.0, count = 0):
global _last_maintern
global _maintern_interval
loop = 0
map = asyncore.socket_map
while map and _shutdown_phase == 0:
poll_fun_wrap (timeout, map)
tick_timer.tick ()
now = time.time()
if (now - _last_maintern) > _maintern_interval:
maintern (now)
_last_maintern = time.time ()
loop += 1
if count and loop > count:
break
def maintern_zombie_channel (now, map = None):
global _killed_zombies
if map is None:
map = asyncore.socket_map
for channel in list(map.values()):
if hasattr (channel, "handle_timeout"):
try:
# +3 is make gap between server & client
iszombie = (now - channel.event_time) > channel.zombie_timeout
except AttributeError:
continue
if iszombie:
_killed_zombies += 1
try:
channel.handle_timeout ()
except:
channel.handle_error ()