Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load(self, filename):
"""Load proxies from file"""
with open(filename, 'r') as fin:
proxies = json.load(fin)
for protocol in proxies:
for proxy in proxies[protocol]:
self.proxies[protocol][proxy['addr']] = Proxy(
proxy['addr'], proxy['protocol'], proxy['weight'],
proxy['last_checked'])
self.addr_list[protocol].append(proxy['addr'])
candidate_proxy = proxy_scanner.proxy_queue.get(
timeout=queue_timeout)
except queue.Empty:
if proxy_scanner.is_scanning():
continue
else:
break
addr = candidate_proxy['addr']
protocol = candidate_proxy['protocol']
ret = self.is_valid(addr, protocol, val_timeout)
if self.proxy_num() >= expected_num:
self.logger.info('Enough valid proxies, thread {} exit.'
.format(threading.current_thread().name))
break
if ret['valid']:
self.add_proxy(Proxy(addr, protocol))
self.logger.info('{} ok, {:.2f}s'.format(addr, ret[
'response_time']))
else:
self.logger.info('{} invalid, {}'.format(addr, ret['msg']))