Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def copies_count(self, request, **kwargs):
"""
Returns the number of node copies for each content id.
"""
content_id_string = self.request.query_params.get("content_ids")
if content_id_string:
content_ids = content_id_string.split(",")
counts = (
models.ContentNode.objects.filter_by_content_ids(content_ids)
.filter(available=True)
.values("content_id")
.order_by()
.annotate(count=Count("content_id"))
)
else:
counts = 0
return Response(counts)
def to_representation(self, instance):
# TODO: rtibbles - cleanup this for device specific serializer.
value = super(ChannelMetadataSerializer, self).to_representation(instance)
value.update({"num_coach_contents": get_num_coach_contents(instance.root)})
# if the request includes a GET param 'include_fields', add the requested calculated fields
if 'request' in self.context:
include_fields = self.context['request'].GET.get('include_fields', '').split(',')
if include_fields:
# build querysets for the full set of channel nodes, as well as those that are unrenderable
channel_nodes = ContentNode.objects.filter(channel_id=instance.id)
unrenderable_nodes = channel_nodes.exclude(renderable_contentnodes_without_topics_q_filter)
if 'total_resources' in include_fields:
# count the total number of renderable non-topic resources in the channel
# (note: it's faster to count them all and then subtract the unrenderables, of which there are fewer)
value['total_resources'] = channel_nodes.dedupe_by_content_id().count() - unrenderable_nodes.dedupe_by_content_id().count()
if 'total_file_size' in include_fields:
# count the total file size of files associated with renderable content nodes
# (note: it's faster to count them all and then subtract the unrenderables, of which there are fewer)
value['total_file_size'] = _total_file_size(channel_nodes) - _total_file_size(unrenderable_nodes)
if 'on_device_resources' in include_fields:
# read the precalculated total number of resources from the channel already available
value['on_device_resources'] = instance.total_resource_count
def get_nodes_to_transfer(
channel_id,
node_ids,
exclude_node_ids,
available,
renderable_only=True,
drive_id=None,
peer_id=None,
):
nodes_to_include = ContentNode.objects.filter(channel_id=channel_id)
# if requested, filter down to only include particular topics/nodes
if node_ids:
nodes_to_include = nodes_to_include.filter_by_uuids(node_ids).get_descendants(
include_self=True
)
# if requested, filter out nodes we're not able to render
if renderable_only:
nodes_to_include = nodes_to_include.filter(renderable_contentnodes_q_filter)
# filter down the query to remove files associated with nodes we've specifically been asked to exclude
if exclude_node_ids:
nodes_to_exclude = ContentNode.objects.filter_by_uuids(
exclude_node_ids
).get_descendants(include_self=True)
def get_by_channel_id_and_content_id(channel_id, content_id):
"""
Function to return a content node based on a channel_id and content_id
"""
if channel_id and content_id:
try:
return ContentNode.objects.filter(
channel_id=channel_id, content_id=content_id
).first()
except ValueError:
# Raised if a malformed UUID is passed
pass
def get_content_title(self, obj):
node = ContentNode.objects.filter(content_id=obj.content_id).first()
if node:
return node.title
else:
return ""
language_id = params.get("language", "").strip()
channels = None
if identifier:
channels = ChannelMetadata.objects.filter(pk=identifier)
else:
channels = ChannelMetadata.objects.all()
if keyword != "":
channels = channels.filter(
Q(name__icontains=keyword) | Q(description__icontains=keyword)
)
if language_id != "":
matching_tree_ids = (
ContentNode.objects.prefetch_related("files")
.filter(
Q(lang__id__icontains=language_id)
| Q(files__lang__id__icontains=language_id)
)
.values_list("tree_id", flat=True)
)
channels = channels.filter(
Q(root__lang__id__icontains=language_id)
| Q(root__tree_id__in=matching_tree_ids)
)
return channels.filter(root__available=True).distinct()
def get_by_node_id(node_id):
"""
Function to return a content node based on a node id
"""
if node_id:
try:
return ContentNode.objects.get(id=node_id)
except (ContentNode.DoesNotExist, ValueError):
# not found, or the id is invalid
pass
lesson_assignments__collection__in=learner_groups,
is_active=True,
resources__regex=r"" + content_id + "",
).distinct()
# get the contentnode_id for each lesson:
lesson_contentnode = {
lesson.id: r["contentnode_id"]
for lesson in filtered_lessons
for r in lesson.resources
if (r["content_id"] == content_id and r["channel_id"] == channel_id)
}
if attempt:
# This part is for the NeedsHelp event. These Events can only be triggered on Exercises:
to_delete = []
for lesson_id, contentnode_id in lesson_contentnode.items():
content_node = ContentNode.objects.get(pk=contentnode_id)
if content_node.kind != content_kinds.EXERCISE:
to_delete.append(lesson_id)
for lesson_id in to_delete:
del lesson_contentnode[lesson_id]
# Returns all the affected lessons with the touched contentnode_id, Resource must be inside a lesson
lesson_resources = [
(lesson, lesson_contentnode[lesson.id])
for lesson in filtered_lessons
if lesson.id in lesson_contentnode
]
# Try to find out if the lesson is being executed assigned to a Classroom or to a LearnerGroup:
for lesson in lesson_resources:
assignments = [l.collection_id for l in lesson[0].lesson_assignments.all()]
groups = [g.id for g in learner_groups if g.id in assignments]
lesson[0].group_or_classroom = groups[0] if groups else lesson.collection_id
def fix_multiple_trees_with_tree_id1():
# Do a check for improperly imported ContentNode trees
# These trees have been naively imported, and so there are multiple trees
# with tree_ids set to 1. Just check the root nodes to reduce the query size.
tree_id_one_channel_ids = ContentNode.objects.filter(
parent=None, tree_id=1
).values_list("channel_id", flat=True)
if len(tree_id_one_channel_ids) > 1:
logger.warning("Improperly imported channels discovered")
# There is more than one channel with a tree_id of 1
# Find which channel has the most content nodes, and then delete and reimport the rest.
channel_sizes = {}
for channel_id in tree_id_one_channel_ids:
channel_sizes[channel_id] = ContentNode.objects.filter(
channel_id=channel_id
).count()
# Get sorted list of ids by increasing number of nodes
sorted_channel_ids = sorted(channel_sizes, key=channel_sizes.get)
# Loop through all but the largest channel, delete and reimport
count = 0
for channel_id in sorted_channel_ids[:-1]:
# Double check that we have a content db to import from before deleting any metadata
if os.path.exists(get_content_database_file_path(channel_id)):
logger.warning(
"Deleting and reimporting channel metadata for {channel_id}".format(
channel_id=channel_id
)
)
ChannelMetadata.objects.get(