Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def raise_rate_limit_once(self, arg, kwarg=0):
if self.first_run:
self.first_run = False
raise tweepy.RateLimitError("testing (this should not lead to a fail)")
else:
return (arg, kwarg)
print bcolors.OKBLUE + '[*] List of trend topics retrieved for ' + str(location)
listtrendsjson = json.dumps(listtrends[0])
listtrendsparsed = json.loads(listtrendsjson)
for elemento in listtrendsparsed['trends']:
print bcolors.OKBLUE + '[*] ' + bcolors.OKGREEN + elemento['name'] + bcolors.OKBLUE + ' - Tweet Volume: ' + bcolors.OKGREEN + str(elemento['tweet_volume'])
except:
raise
sys.exit()
while True:
try:
public_tweets = tweepy.Cursor(api.search, q=ricerca[0]).items(1)
break
except tweepy.RateLimitError:
logginf.critical(" [*] WARNING: Rate Limit error encountered. Sleeping 15 minutes")
PrintRateLimit()
time.sleep(61*15)
print logging.warning(bcolors.HEADER + " [*] Retrying..." + bcolors.OKBLUE)
continue
for tweet in public_tweets:
LastTweetText = tweet.text
LastTweetId = tweet.id
logging.warning(bcolors.HEADER + "[*] Twitint is being initialized. This will take a minute" + bcolors.OKBLUE)
while True:
trovato = False
trovatototal = 0
logging.warning(bcolors.HEADER + " [*] Waiting 60 seconds in order to avoid Twitter API Rate Limit" + bcolors.OKBLUE)
shuffle(followers)
messages = config_data["messages"]
greetings = ['Hey', 'Hi', 'Hello']
# tries sending a message to your followers. switches greeting and message.
print('Starting to send messages... ')
for user, message, greeting in zip(followers, cycle(messages), cycle(greetings)):
try:
username = api.get_user(user).screen_name
# sends dm.
api.send_direct_message(user_id=user, text='{} {},\n{}'.format(greeting, username, message))
total_followed += 1
if total_followed % 5 == 0:
print(str(total_followed) + ' messages sent so far.')
print('Sent the user a DM. Sleeping 45 seconds.')
sleep(45)
except (tweepy.RateLimitError, tweepy.TweepError) as e:
error_handling(e)
print(total_followed)
mentions = api.direct_messages()
else:
mentions = api.mentions_timeline(since_id=last_dm[0])
user.set_last_twitter_request(time())
for status in mentions:
text = re.sub(
"(?<=^|(?<=[^a-zA-Z0-9-_\.]))@([A-Za-z]+[A-Za-z0-9-_]+)",
"", status.text)
reports.append(report.Report(status.author.screen_name,
"twitterDM",
text,
status.id,
status.created_at))
user.save_seen_dm(last_dm)
return reports
except tweepy.RateLimitError:
logger.error("Twitter API Error: Rate Limit Exceeded",
exc_info=True)
# :todo implement rate limiting
except requests.exceptions.ConnectionError:
logger.error("Twitter API Error: Bad Connection", exc_info=True)
except tweepy.TweepError:
logger.error("Twitter API Error: General Error", exc_info=True)
return []
j = len(friends)
while True:
try:
try:
user_details += self.connection.api.lookup_users(user_ids=friends[i:j],
tweet_mode='extended')
except tweepy.error.TweepError as e:
if "No user matches for specified terms." in e.reason:
stdout.write(f"No user matches for {friends[i:j]}")
stdout.flush()
else:
raise e
self.connection.calls_dict['/users/lookup'] = 1
break
except tweepy.RateLimitError:
self.check_API_calls_and_update_if_necessary(endpoint='/users/lookup',
check_calls=False)
i += 100
return user_details
time_per_100.append(t1-t0)
fetch_count = fetch_count + 1
avg_time_per_fetch = sum(time_per_100)/len(time_per_100)
tweet_count += len(status_obj)
modified_at = datetime.datetime.now().strftime('%H:%M:%S %d-%m-%Y')
print("Scraped {0} tweets, Total ={1} tweets".format(
len(status_obj), tweet_count))
# save all the stats to REDIS
r.set('tweet_count', tweet_count)
r.set('avg_time_per_fetch', avg_time_per_fetch)
r.set('fetch_count', fetch_count)
r.set('modified_at', modified_at)
# r.set('target', target)
except tweepy.RateLimitError:
print("Going to Sleep")
time.sleep(15 * 60)
except Exception as e:
print(str(e))
time.sleep(15 * 60)
finally:
t0 = time.time()
tweet_id_list[:] = []
# Stop thread if no of tweets > 1,00,00,000
if(fetch_count*100 > target):
print "Fetched All the Tweets"
return
def stream(api, args):
while True:
try:
listener = LessListener(api,
post_replies=args.post_replies,
gather=args.gather,
state_dir=args.state)
stream = tweepy.Stream(api.auth, listener)
if args.use_public_stream:
stream.filter(track=['less'])
else:
stream.userstream(replies='all')
except tweepy.RateLimitError:
log.warning("Rate-limited, and Tweepy didn't save us; time for a nap",
exc_info=True)
time.sleep(15 * 60)
"""
https://dev.twitter.com/rest/reference/get/followers/ids
Requests / 15-min window (app auth): 15
"""
api = get_tweepy_api()
ids = []
is_first = True
try:
for page in tweepy.Cursor(api.followers_ids, screen_name=screen_name, count=5000).pages():
if not is_first:
time.sleep(60)
else:
is_first = False
ids.extend(page)
except tweepy.RateLimitError:
extra_data = {
'screen_name' : screen_name,
}
rollbar.report_exc_info(extra_data=extra_data)
return ids
def limit_rate(cursor):
while 1:
try:
yield cursor.next()
except tweepy.RateLimitError:
print('rate limit exceeded -> sleeping 15.25min')
time.sleep(15 * 61) # better be sure
def get_friends(self):
for friend_id in self.api.friends_ids(self.user.id):
try:
yield self.api.get_user(friend_id)
except tweepy.RateLimitError:
logging.info("[%s] Timeout Reached, Sleep for 15 minutes before restart" % self.process_name)
time.sleep(15 * 60)
logging.info("[%s] Waking up. Try again" % self.process_name)
except StopIteration:
logging.info("[%s] Stop Iteration, process complete" % self.process_name)
break
except Exception as e:
logging.info("[%s] Generic Error, restart in 60 seconds: %s" % (self.process_name, e))
time.sleep(60)