Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python
import settings
import csv
import redis
import tempfile
import time
from tweepy.api import API
from tweepy.models import Status
from tweepy.utils import import_simplejson
from tweepy.utils import convert_to_utf8_str
api = API()
json = import_simplejson()
def bulk_load(listkey, tweets):
with open('/home/marcua/data/tweets/%s' % (listkey), 'w') as tmpfile:
print "file %s" % (tmpfile.name)
for jsontweet in tweets:
tweet = Status.parse(api, json.loads(jsontweet))
tmpfile.write(convert_to_utf8_str(tweet.text) + "\n")
def poll_data(r):
listkey = r.rpoplpush(settings.TO_INDEX, settings.CONSUMED_INDICES)
if listkey != None:
tweets = r.lrange(listkey, 0, -1)
bulk_load(listkey, tweets)
r.lrem(settings.CONSUMED_INDICES, listkey, 0)
r.delete(listkey)
def __init__(self, api=None):
self.api = api or API()
def get_username(self):
if self.username is None:
api = API(self)
user = api.verify_credentials()
if user:
self.username = user.screen_name
else:
raise TweepError('Unable to get username,'
' invalid oauth token!')
return self.username
def __init__(self, auth, listener, **options):
self.auth = auth
self.listener = listener
self.running = False
self.timeout = options.get("timeout", 300.0)
self.retry_count = options.get("retry_count")
self.retry_time = options.get("retry_time", 10.0)
self.snooze_time = options.get("snooze_time", 5.0)
self.buffer_size = options.get("buffer_size", 1500)
if options.get("secure", True):
self.scheme = "https"
else:
self.scheme = "http"
self.api = API()
self.headers = options.get("headers") or {}
self.parameters = None
self.body = None
Tweepy Twitter API library
"""
__version__ = '2.0'
__author__ = 'Joshua Roesslein'
__license__ = 'MIT'
from tweepy.models import Status, User, DirectMessage, Friendship, SavedSearch, SearchResults, ModelFactory, Category
from tweepy.error import TweepError
from tweepy.api import API
from tweepy.cache import Cache, MemoryCache, FileCache
from tweepy.auth import BasicAuthHandler, OAuthHandler
from tweepy.streaming import Stream, StreamListener
from tweepy.cursor import Cursor
# Global, unauthenticated instance of API
api = API()
def debug(enable=True, level=1):
import httplib
httplib.HTTPConnection.debuglevel = level
def __init__(self, username, password, listener, timeout=5.0, retry_count = None,
retry_time = 10.0, snooze_time = 5.0, buffer_size=1500):
self.auth = BasicAuthHandler(username, password)
self.running = False
self.timeout = timeout
self.retry_count = retry_count
self.retry_time = retry_time
self.snooze_time = snooze_time
self.buffer_size = buffer_size
self.listener = listener
self.api = API()
self.headers = {}
self.body = None
def __init__(self, api=None):
self.api = api or API()
Tweepy Twitter API library
"""
__version__ = '1.5'
__author__ = 'Joshua Roesslein'
__license__ = 'MIT'
from tweepy.models import Status, User, DirectMessage, Friendship, SavedSearch, SearchResult, ModelFactory
from tweepy.error import TweepError
from tweepy.api import API
from tweepy.cache import Cache, MemoryCache, FileCache
from tweepy.auth import BasicAuthHandler, OAuthHandler
from tweepy.streaming import Stream, StreamListener
from tweepy.cursor import Cursor
# Global, unauthenticated instance of API
api = API()
def debug(enable=True, level=1):
import httplib
httplib.HTTPConnection.debuglevel = level
def __init__(self, api=None):
self.api = api or API()
mongo_config.update({'module': 'collecting-follow'},
{'$set': {'termsList': {'handle': user['handle'], 'id': user_id, 'collect': 0 }}})
# Loops thru current stored handles and adds list if both:
# A) Value isn't set to None (not valid OR no longer in use)
all_stored_handles = [user['handle'] for user in stored_terms]
stored_handles = [user['handle'] for user in stored_terms if user['id'] and user['collect']]
print 'MAIN: %d user ids for collection found in Mongo!' % len(stored_handles)
"""
# Loop thru & query (except handles that have been stored)
print 'MAIN: Querying Twitter API for handle:id pairs...'
logger.info('MAIN: Querying Twitter API for handle:id pairs...')
# Initiates REST API connection
twitter_api = API(auth_handler=auth)
failed_handles = []
success_handles = []
# Loops thru user-given terms list
for item in termsList:
term = item['term']
# If term already has a valid ID, pass
if item['id'] is not None:
pass
# Queries the Twitter API for the ID value of the handle
else:
try:
user = twitter_api.get_user(screen_name=term)
except TweepError as tweepy_exception:
error_message = tweepy_exception.args[0][0]['message']
code = tweepy_exception.args[0][0]['code']
# Rate limited for 15 minutes w/ code 88