Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_replace_urls(self):
assert helpers.replace_urls(self.status) == TWEET['text']
TWEET['entities']['urls'] = [{"indices": [0, 12], "expanded_url": "http://long.long.url"}]
TWEET['text'] = 'http://short hey'
status = tweepy.Status.parse(self.api, TWEET)
assert helpers.replace_urls(status) == "http://long.long.url hey"
def fake_timeline():
return [tweepy.Status.parse(tweepy.api, t) for t in TIMELINE]
def setUp(self):
self.api = tweepy.API()
self.status = tweepy.Status.parse(self.api, TWEET)
parent = args.parent(version='1.2.3')
self.parser = argparse.ArgumentParser(description='desc', parents=[parent])
sys.argv = ['test', '--dry-run', '-v', '-c', self.yaml]
self.args = self.parser.parse_args()
def setUp(self):
api = tweepy.API()
self.status = tweepy.Status.parse(api, TWEET)
with open(path.join(path.dirname(__file__), 'data', 'tweets.txt')) as f:
self.txt = f.readlines()
self.archive = archive.read_csv(path.join(path.dirname(__file__), 'data'))
if 'delete' in data_dict:
LOG.debug(
f"data_dict['delete']: {data_dict['delete']}")
elif 'event' in data_dict:
LOG.debug(
f"data_dict['event']: {data_dict['event']}")
status = tweepy.Status.parse(self.api, data_dict)
elif 'direct_message' in data_dict:
LOG.debug(
f"data_dict['direct_message']: {data_dict['direct_message']}")
message = tweepy.Status.parse(self.api, data_dict)
self.tweeter.aggregator.initiate_reply(message, LookupSource.DIRECT_MESSAGE.value)
elif 'friends' in data_dict:
LOG.debug(
f"data_dict['friends']: {data_dict['friends']}")
elif 'limit' in data_dict:
LOG.debug(
f"data_dict['limit']: {data_dict['limit']}")
elif 'disconnect' in data_dict:
LOG.debug(
f"data_dict['disconnect']: {data_dict['disconnect']}")
elif 'warning' in data_dict:
data_dict: Dict[any, any] = json.loads(data)
formatted_data = json.dumps(data_dict,
indent=4,
sort_keys=True)
LOG.debug(f'data: {formatted_data}')
if 'delete' in data_dict:
LOG.debug(
f"data_dict['delete']: {data_dict['delete']}")
elif 'event' in data_dict:
LOG.debug(
f"data_dict['event']: {data_dict['event']}")
status = tweepy.Status.parse(self.api, data_dict)
elif 'direct_message' in data_dict:
LOG.debug(
f"data_dict['direct_message']: {data_dict['direct_message']}")
message = tweepy.Status.parse(self.api, data_dict)
self.tweeter.aggregator.initiate_reply(message, LookupSource.DIRECT_MESSAGE.value)
elif 'friends' in data_dict:
LOG.debug(
f"data_dict['friends']: {data_dict['friends']}")
elif 'limit' in data_dict:
LOG.debug(
f"data_dict['limit']: {data_dict['limit']}")
responses = resp.get('pullResponses')
if responses is not None:
ack_ids = []
messages = []
for response in responses:
message = response.get('pubsubEvent').get('message')
if message:
messages.append( base64.urlsafe_b64decode(str(message.get('data'))) )
ack_ids.append(response.get('ackId'))
rows = []
for message in messages:
try:
tweet_json = json.loads(message)
tweet = tweepy.Status.parse(None, tweet_json)
rows.append({
'id': tweet.id,
'created_at': float(tweet.timestamp_ms)/1000,
'author_name': tweet_author_name(tweet),
'is_reply': tweet_is_reply(tweet),
'is_retweet': tweet_is_retweet(tweet),
'text': tweet.text,
'json': message,
})
except Exception as e:
print e
inserted = client_bq.push_rows(dataset_name, table_name, rows, 'id')
ack_body = {'subscription': subscription, 'ackId': ack_ids}
client.subscriptions().acknowledge(body=ack_body).execute(