Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_result(self):
try:
result = self.botometer.check_account(self.user_id)
except NoTimelineError:
self.status = BotometerStatus.UNAVAILABLE
else:
self._probability = result.get("cap", {}).get("universal")
self.status = BotometerStatus.READY
def run(self):
print('Worker started')
# do some initialization here
while True:
data = self.queue.get(True)
try:
if data is None:
print('ALL FINISHED!!!!', self.conn_number)
break
print 'checking: ', data
result = self.bom._get_twitter_data(data)
json.dump(result, gzip.open(os.path.join(self.out_dir,str(data)+".gz"),"wb"))
sleep(7)
except botometer.NoTimelineError:
print 'no timeline for: ', data, ' continuing'
except TweepError as e:
print e
except KeyboardInterrupt as e:
print e
break
except Exception:
print('FAILED:: ', data)
exc_type, exc_value, exc_traceback = sys.exc_info()
print("*** print_tb:")
traceback.print_tb(exc_traceback, limit=50, file=sys.stdout)
print("*** print_exception:")
print 'finished: ', data
rapid_key = cls.conf['botometer']['rapid_key']
botometer_api_url = 'https://botometer-pro.p.rapidapi.com'
logger.info(rapid_key)
bon = botometer.Botometer(
botometer_api_url=botometer_api_url,
rapidapi_key=rapid_key,
wait_on_ratelimit=True,
**cls.conf['botometer']['twitter_app_credentials'])
max_retries = 3
num_retries = 0
for ms in mspreaders:
for num_retries in range(max_retries + 1):
result = None
try:
result = bon.check_account(ms.user_raw_id)
except (TweepError, NoTimelineError) as e:
err_msg = '{}: {}'.format(
type(e).__name__,
getattr(e, 'msg', '') or getattr(e, 'reason', ''),
)
result = {'error': err_msg}
except (Timeout, ConnectionError, HTTPError) as e:
if num_retries >= max_retries:
raise
else:
time.sleep(2**num_retries)
if result is not None:
ms.bot_or_not = result
break
def run(self):
self.status = BotometerStatus.RUNNING
try:
result = self.botometer.check_account(self.user_id)
except NoTimelineError:
self.status = BotometerStatus.UNAVAILABLE
self._probability = 0.0 # let's assume it's not a bot
else:
self.status = BotometerStatus.READY
self._probability = result.get("cap", {}).get("universal")
def botometer(follower):
kwargs = authentication.botometer.copy()
kwargs["wait_on_rate_limit"] = True
api = Botometer(**kwargs)
try:
result = api.check_account(follower)
except NoTimelineError:
return 0.5 # user with no post, let's toss a coin
return result.get("cap", {}).get("universal")