Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def save_metadata_json(self, filename: str, post: Post) -> None:
"""Saves metadata JSON file of a :class:`Post`."""
filename += '.json'
json.dump(post, fp=open(filename, 'w'), indent=4, default=Post.json_encoder)
self._log('json', end=' ', flush=True)
def visit_Name(self, node: ast.Name):
# pylint:disable=invalid-name,no-self-use
if not isinstance(node.ctx, ast.Load):
raise InvalidArgumentException("Invalid filter: Modifying variables ({}) not allowed.".format(node.id))
if not hasattr(Post, node.id):
raise InvalidArgumentException("Invalid filter: Name {} is not defined.".format(node.id))
if node.id in Post.LOGIN_REQUIRING_PROPERTIES and not logged_in:
raise InvalidArgumentException("Invalid filter: Name {} requires being logged in.".format(node.id))
new_node = ast.Attribute(ast.copy_location(ast.Name('post', ast.Load()), node), node.id,
ast.copy_location(ast.Load(), node))
return ast.copy_location(new_node, node)
def visit_Name(self, node: ast.Name):
# pylint:disable=invalid-name,no-self-use
if not isinstance(node.ctx, ast.Load):
raise InvalidArgumentException("Invalid filter: Modifying variables ({}) not allowed.".format(node.id))
if not hasattr(Post, node.id):
raise InvalidArgumentException("Invalid filter: Name {} is not defined.".format(node.id))
if node.id in Post.LOGIN_REQUIRING_PROPERTIES and not logged_in:
raise InvalidArgumentException("Invalid filter: Name {} requires being logged in.".format(node.id))
new_node = ast.Attribute(ast.copy_location(ast.Name('post', ast.Load()), node), node.id,
ast.copy_location(ast.Load(), node))
return ast.copy_location(new_node, node)
def get_profile_posts(self, profile_metadata: Dict[str, Any], rhx_gis: str) -> Iterator[Post]:
"""Retrieve all posts from a profile."""
profile_name = profile_metadata['user']['username']
profile_id = int(profile_metadata['user']['id'])
if 'media' in profile_metadata['user']:
# backwards compatibility with old non-graphql structure
yield from (Post(self, node, profile=profile_name, profile_id=profile_id)
for node in profile_metadata['user']['media']['nodes'])
has_next_page = profile_metadata['user']['media']['page_info']['has_next_page']
end_cursor = profile_metadata['user']['media']['page_info']['end_cursor']
else:
yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id)
for edge in profile_metadata['user']['edge_owner_to_timeline_media']['edges'])
has_next_page = profile_metadata['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']
end_cursor = profile_metadata['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
while has_next_page:
# We do not use self.graphql_node_list() here, because profile_metadata
# lets us obtain the first 12 nodes 'for free'
data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': end_cursor},
referer='https://www.instagram.com/{0}/'.format(profile_name), rhx_gis=rhx_gis)
media = data['data']['user']['edge_owner_to_timeline_media']
yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id)
for edge in media['edges'])
has_next_page = media['page_info']['has_next_page']
end_cursor = media['page_info']['end_cursor']
def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]:
"""Get Posts associated with a #hashtag."""
has_next_page = True
end_cursor = None
while has_next_page:
if end_cursor:
params = {'__a': 1, 'max_id': end_cursor}
else:
params = {'__a': 1}
hashtag_data = self.get_json('explore/tags/{0}/'.format(hashtag),
params)['graphql']['hashtag']['edge_hashtag_to_media']
yield from (Post(self, edge['node']) for edge in
hashtag_data['edges'])
has_next_page = hashtag_data['page_info']['has_next_page']
end_cursor = hashtag_data['page_info']['end_cursor']
if not self.is_logged_in:
return
data, full_metadata = self.get_profile_metadata(self.username)
user_id = data["user"]["id"]
while True:
if "edge_saved_media" in data["user"]:
is_edge = True
saved_media = data["user"]["edge_saved_media"]
else:
is_edge = False
saved_media = data["user"]["saved_media"]
if is_edge:
yield from (Post(self, edge["node"]) for edge in saved_media["edges"])
else:
yield from (Post(self, node) for node in saved_media["nodes"])
if not saved_media["page_info"]["has_next_page"]:
break
data = self.graphql_query("f883d95537fbcd400f466f63d42bd8a1",
{'id': user_id, 'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': saved_media["page_info"]["end_cursor"]},
rhx_gis=full_metadata['rhx_gis'])['data']
def get_username_by_id(self, profile_id: int) -> str:
"""To get the current username of a profile, given its unique ID, this function can be used."""
data = self.graphql_query(17862015703145017, {'id': str(profile_id), 'first': 1})['data']['user']
if data:
data = data["edge_owner_to_timeline_media"]
else:
raise ProfileNotExistsException("No profile found, the user may have blocked you (ID: " +
str(profile_id) + ").")
if not data['edges']:
if data['count'] == 0:
raise ProfileHasNoPicsException("Profile with ID {0}: no pics found.".format(str(profile_id)))
else:
raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").")
else:
return Post.from_mediaid(self, int(data['edges'][0]["node"]["id"])).owner_username
def __eq__(self, o: object) -> bool:
if isinstance(o, Post):
return self.shortcode == o.shortcode
return NotImplemented
def get_profile_posts(self, profile_metadata: Dict[str, Any], rhx_gis: str) -> Iterator[Post]:
"""Retrieve all posts from a profile."""
profile_name = profile_metadata['user']['username']
profile_id = int(profile_metadata['user']['id'])
if 'media' in profile_metadata['user']:
# backwards compatibility with old non-graphql structure
yield from (Post(self, node, profile=profile_name, profile_id=profile_id)
for node in profile_metadata['user']['media']['nodes'])
has_next_page = profile_metadata['user']['media']['page_info']['has_next_page']
end_cursor = profile_metadata['user']['media']['page_info']['end_cursor']
else:
yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id)
for edge in profile_metadata['user']['edge_owner_to_timeline_media']['edges'])
has_next_page = profile_metadata['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']
end_cursor = profile_metadata['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
while has_next_page:
# We do not use self.graphql_node_list() here, because profile_metadata
# lets us obtain the first 12 nodes 'for free'
data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': end_cursor},
referer='https://www.instagram.com/{0}/'.format(profile_name), rhx_gis=rhx_gis)
media = data['data']['user']['edge_owner_to_timeline_media']