Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# mirror some methods from webutil.testutil
from oauth_dropins import handlers as oauth_handlers
from oauth_dropins.webutil.testutil import get_task_eta, get_task_params
from oauth_dropins.webutil.util import json_dumps, json_loads
import requests
import util
NOW = datetime.datetime.utcnow()
class FakeAuthEntity(BaseAuth):
user_json = ndb.TextProperty()
class FakeGrSource(gr_source.Source):
"""Fake granary source class.
Attributes:
activities, like, reaction, share, event, rsvp, etag, search_results,
last_search_query, blocked_ids
"""
NAME = 'FakeSource'
DOMAIN = 'fa.ke'
last_search_query = None
search_results = []
def user_url(self, id):
return 'http://fa.ke/' + id
def user_to_actor(self, user):
def get_activities_response(self, **kwargs):
activities = []
activity_id = kwargs.get('activity_id')
if activity_id:
activities = [{
'id': activity_id,
'url': 'https://www.facebook.com/%s/posts/%s' % (self.key.id(), activity_id),
}]
return gr_source.Source.make_activities_base_response(activities)
def is_activity_public(self, activity):
"""Returns True if the given activity is public, False otherwise.
Just wraps :meth:`granary.source.Source.is_public`. Subclasses may override.
"""
return gr_source.Source.is_public(activity)
obj['location'].update({
'latitude': float(lat),
'longitude': float(long),
})
a = {
'objectType': 'activity',
'verb': _as1_value(entry, 'verb'),
'id': _text(entry, 'id') or (obj['id'] if obj_elem is None else None),
'url': _text(entry, 'uri') or (obj['url'] if obj_elem is None else None),
'object': obj,
'actor': _author_to_actor(entry),
'inReplyTo': obj.get('inReplyTo'),
}
return source.Source.postprocess_activity(a)
already_fetched_hfeeds: set, URLs that we have already fetched and run
posse-post-discovery on, so we can avoid running it multiple times
Returns:
(set(string original post URLs), set(string mention URLs)) tuple
"""
logging.debug('discovering original posts for: %s',
activity.get('url') or activity.get('id'))
if not source.updates:
source.updates = {}
if already_fetched_hfeeds is None:
already_fetched_hfeeds = set()
originals, mentions = gr_source.Source.original_post_discovery(
activity, domains=source.domains,
include_redirect_sources=include_redirect_sources,
headers=util.request_headers(source=source))
# only include mentions of the author themselves.
# (mostly just for Mastodon; other silos' domains are all in the blacklist, so
# their mention URLs get dropped later anyway.)
# (these are originally added in Source._inject_user_urls() and in poll step 2.)
obj = activity.get('object', {})
other_user_mentions = set(
t.get('url') for t in obj.get('tags', [])
if t.get('objectType') == 'person' and t.get('url') not in source.domain_urls)
originals -= other_user_mentions
mentions -= other_user_mentions
# original posts are only from the author themselves
# maps verb to human-readable verb
DISPLAY_VERBS = {
'give': 'gave',
'like': 'likes',
'listen': 'listened to',
'play': 'watched',
'read': 'read',
'share': 'shared',
}
actor_name = Source.actor_name(activity.get('actor'))
obj = activity.get('object')
if obj:
activity['object'] = Source.postprocess_object(obj)
if not activity.get('title'):
verb = DISPLAY_VERBS.get(activity.get('verb'))
obj_name = obj.get('displayName')
obj_type = TYPE_DISPLAY_NAMES.get(obj.get('objectType'))
if obj_name and not verb:
activity['title'] = obj_name
elif verb and (obj_name or obj_type):
app = activity.get('generator', {}).get('displayName')
name = obj_name if obj_name else 'a %s' % (obj_type or 'unknown')
app = ' on %s' % app if app else ''
activity['title'] = '%s %s %s%s.' % (actor_name, verb or 'posted',
name, app)
return util.trim_nulls(activity)
'object': objects[0] if len(objects) == 1 else objects,
'actor': author,
})
if as_verb == 'tag':
obj['target'] = {'url': prop['tag-of']}
if obj.get('object'):
raise NotImplementedError(
'Combined in-reply-to and tag-of is not yet supported.')
obj['object'] = obj.pop('tags')
else:
obj.update({
'inReplyTo': [{'url': url} for url in in_reply_tos],
'author': author,
})
return source.Source.postprocess_object(obj)
Returns: ([string original post URLs], [string mention URLs]) tuple
"""
obj = activity.get('object') or activity
content = obj.get('content', '').strip()
# find all candidate URLs
tags = [t.get('url') for t in obj.get('attachments', []) + obj.get('tags', [])
if t.get('objectType') in ('article', 'mention', None)]
candidates = tags + util.extract_links(content) + obj.get('upstreamDuplicates', [])
# Permashortcitations (http://indiewebcamp.com/permashortcitation) are short
# references to canonical copies of a given (usually syndicated) post, of
# the form (DOMAIN PATH). We consider them an explicit original post link.
candidates += [match.expand(r'http://\1/\2') for match in
Source._PERMASHORTCITATION_RE.finditer(content)]
candidates = set(filter(None,
(util.clean_url(url) for url in candidates
# heuristic: ellipsized URLs are probably incomplete, so omit them.
if url and not url.endswith('...') and not url.endswith(u'…'))))
# check for redirect and add their final urls
redirects = {} # maps final URL to original URL for redirects
for url in list(candidates):
resolved = follow_redirects(url, cache=cache, **kwargs)
if (resolved.url != url and
resolved.headers.get('content-type', '').startswith('text/html')):
redirects[resolved.url] = url
candidates.add(resolved.url)
# use domains to determine which URLs are original post links vs mentions
"""
activity = util.trim_nulls(activity)
# maps object type to human-readable name to use in title
TYPE_DISPLAY_NAMES = {'image': 'photo', 'product': 'gift'}
# maps verb to human-readable verb
DISPLAY_VERBS = {
'give': 'gave',
'like': 'likes',
'listen': 'listened to',
'play': 'watched',
'read': 'read',
'share': 'shared',
}
actor_name = Source.actor_name(activity.get('actor'))
obj = activity.get('object')
if obj:
activity['object'] = Source.postprocess_object(obj)
if not activity.get('title'):
verb = DISPLAY_VERBS.get(activity.get('verb'))
obj_name = obj.get('displayName')
obj_type = TYPE_DISPLAY_NAMES.get(obj.get('objectType'))
if obj_name and not verb:
activity['title'] = obj_name
elif verb and (obj_name or obj_type):
app = activity.get('generator', {}).get('displayName')
name = obj_name if obj_name else 'a %s' % (obj_type or 'unknown')
app = ' on %s' % app if app else ''
activity['title'] = '%s %s %s%s.' % (actor_name, verb or 'posted',
name, app)
except ElementTree.ParseError as e:
raise exc.HTTPBadRequest('Could not parse %s as XML: %s' % (url, e))
except ValueError as e:
raise exc.HTTPBadRequest('Could not parse %s as Atom: %s' % (url, e))
elif input == 'html':
activities = microformats2.html_to_activities(resp, url, actor)
elif input in ('mf2-json', 'json-mf2'):
activities = [microformats2.json_to_object(item, actor=actor)
for item in mf2.get('items', [])]
elif input == 'jsonfeed':
activities, actor = jsonfeed.jsonfeed_to_activities(body_json)
except ValueError as e:
logging.warning('parsing input failed', stack_info=True)
self.abort(400, 'Could not parse %s as %s: %s' % (url, input, str(e)))
self.write_response(source.Source.make_activities_base_response(activities),
url=url, actor=actor, title=title, hfeed=hfeed)