Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delete_start_redirect_url_error(self):
self.mox.StubOutWithMock(testutil.OAuthStartHandler, 'redirect_url')
testutil.OAuthStartHandler.redirect_url(state=mox.IgnoreArg()
).AndRaise(tweepy.TweepError('Connection closed unexpectedly...'))
self.mox.ReplayAll()
resp = app.application.get_response(
'/delete/start', method='POST', body=native_str(urllib.parse.urlencode({
'feature': 'listen',
'key': self.sources[0].key.urlsafe(),
})))
self.assertEquals(302, resp.status_int)
location = urllib.parse.urlparse(resp.headers['Location'])
self.assertEquals('/fake/0123456789', location.path)
self.assertEquals('!FakeSource API error 504: Connection closed unexpectedly...',
urllib.parse.unquote(location.fragment))
from tweepy import OAuthHandler
from tweepy import Stream
from kafka import SimpleProducer, KafkaClient
from six.moves import configparser
from multiprocessing.pool import ThreadPool
#Variables that contains the user credentials to access Twitter API
config = configparser.ConfigParser()
config.read('config.ini')
access_token = config.get('auth', 'access_token')
access_token_secret = config.get('auth', 'access_token_secret')
consumer_key = config.get('auth', 'consumer_key')
consumer_secret = config.get('auth', 'consumer_secret')
class StdOutListener(StreamListener):
def on_data(self, data):
producer.send_messages("tweetdata", data.encode('utf-8'))
print (data)
return True
def on_error(self, status):
print ("Status - ", status)
#This is a basic listener that just prints received tweets to stdout.
def start_streaming(stream):
def map_func(topics):
stream.filter(track=topics)
def test_follow_encoding(self):
s = Stream(None, None)
s._start = lambda is_async: None
s.filter(follow=[u'Caf\xe9'])
# Should be UTF-8 encoded
self.assertEqual(u'Caf\xe9'.encode('utf8'), s.body['follow'])
def test_sanitize(filename, expected):
api = NonCallableMock()
with open(os.path.join('tests', filename), 'r') as f:
status = Status.parse(api, json.load(f))
text = get_sanitized_text(status)
assert '&' not in text
assert 'http' not in text
assert text == expected
def get_tweepy_status_populated(tweetsFilePath='tests/testTweetOut.json'):
# Load sample data in place
with open(tweetsFilePath, 'r') as ifl:
tweetSet = json.load(ifl)
tweets_from_disk = []
for x in tweetSet:
tweets_from_disk.append(tweepy.models.Status().parse(None, x))
return tweets_from_disk
def update_status(self, **kwargs):
self._updates.append(kwargs)
r = Status(api=self)
setattr(r, 'id', len(self._updates))
setattr(r, 'author', self.me())
# Status.user is "DEPRECIATED" so we omit it
return r
def raise_rate_limit_once(self, arg, kwarg=0):
if self.first_run:
self.first_run = False
raise tweepy.RateLimitError("testing (this should not lead to a fail)")
else:
return (arg, kwarg)
def test_replace_urls(self):
assert helpers.replace_urls(self.status) == TWEET['text']
TWEET['entities']['urls'] = [{"indices": [0, 12], "expanded_url": "http://long.long.url"}]
TWEET['text'] = 'http://short hey'
status = tweepy.Status.parse(self.api, TWEET)
assert helpers.replace_urls(status) == "http://long.long.url hey"
def fake_timeline():
return [tweepy.Status.parse(tweepy.api, t) for t in TIMELINE]
def setUp(self):
self.api = tweepy.API()
self.status = tweepy.Status.parse(self.api, TWEET)
parent = args.parent(version='1.2.3')
self.parser = argparse.ArgumentParser(description='desc', parents=[parent])
sys.argv = ['test', '--dry-run', '-v', '-c', self.yaml]
self.args = self.parser.parse_args()