Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def handle(self, *args, **options):
channel_id = options["channel_id"]
position = options["pos"]
count = ChannelMetadata.objects.count()
try:
target_channel = ChannelMetadata.objects.get(id=channel_id)
except (ChannelMetadata.DoesNotExist, ValueError):
self.stderr.write("Channel with ID {} does not exist".format(channel_id))
sys.exit(1)
if position < 1 or position > count:
self.stderr.write(
"Invalid position {}. Please choose a value between [1-{}].".format(
position, count
)
)
sys.exit(1)
ChannelMetadata.objects.filter(
order__lt=target_channel.order, order__gte=position
).update(order=F("order") + 1)
ChannelMetadata.objects.filter(
def handle(self, *args, **options):
storage_channel_ids = get_channel_ids_for_content_database_dir(
get_content_database_dir_path()
)
database_channel_ids = list(
ChannelMetadata.objects.all().values_list("id", flat=True)
)
all_channel_ids = set(storage_channel_ids + database_channel_ids)
for channel_id in all_channel_ids:
if channel_id not in database_channel_ids:
try:
import_channel_from_local_db(channel_id)
set_content_visibility_from_disk(channel_id)
except (InvalidSchemaVersionError, FutureSchemaError):
logger.warning(
"Tried to import channel {channel_id}, but database file was incompatible".format(
channel_id=channel_id
)
)
except DatabaseError:
logger.warning(
"Tried to import channel {channel_id}, but database file was corrupted.".format(
def get_channel_name(self, obj):
try:
channel = ChannelMetadata.objects.get(id=obj.channel_id)
except ChannelMetadata.DoesNotExist:
return ""
return channel.name
channel_id=channel_id
).count()
# Get sorted list of ids by increasing number of nodes
sorted_channel_ids = sorted(channel_sizes, key=channel_sizes.get)
# Loop through all but the largest channel, delete and reimport
count = 0
for channel_id in sorted_channel_ids[:-1]:
# Double check that we have a content db to import from before deleting any metadata
if os.path.exists(get_content_database_file_path(channel_id)):
logger.warning(
"Deleting and reimporting channel metadata for {channel_id}".format(
channel_id=channel_id
)
)
ChannelMetadata.objects.get(
id=channel_id
).delete_content_tree_and_files()
import_channel_from_local_db(channel_id)
logger.info(
"Successfully reimported channel metadata for {channel_id}".format(
channel_id=channel_id
)
)
count += 1
else:
logger.warning(
"Attempted to reimport channel metadata for channel {channel_id} but no content database found".format(
channel_id=channel_id
)
)
if count:
def get_channel_name(channel_id, require_channel=False):
try:
channel = ChannelMetadata.objects.get(id=channel_id)
channel_name = channel.name
except ChannelMetadata.DoesNotExist:
if require_channel:
raise serializers.ValidationError("This channel does not exist")
channel_name = ""
return channel_name
# Because LocalFile does not exist on old content databases, we have to override the table that
# we are drawing from, the generate_local_file_from_file method overrides the default mapping behaviour
# and instead reads from the File model table
# It then uses per_row mappers to get the require model fields from the File model to populate our
# new LocalFiles.
"per_table": "generate_local_file_from_file",
"per_row": {
"id": "checksum",
"extension": "extension",
"file_size": "file_size",
"available": "get_none",
},
},
ChannelMetadata: {
"per_row": {
ChannelMetadata._meta.get_field(
"min_schema_version"
).attname: "set_version_to_no_version",
"root_id": "root_pk",
}
},
}
licenses = {}
def infer_channel_id_from_source(self, source_object):
return self.channel_id
def generate_local_file_from_file(self, SourceRecord):
SourceRecord = self.source.get_class(File)
checksum_record = set()
# LocalFile objects are unique per checksum
def read_channel_metadata_from_db_file(channeldbpath):
# import here to avoid circular imports whenever kolibri.core.content.models imports utils too
from kolibri.core.content.models import ChannelMetadata
source = Bridge(sqlite_file_path=channeldbpath)
ChannelMetadataClass = source.get_class(ChannelMetadata)
source_channel_metadata = source.session.query(ChannelMetadataClass).all()[0]
# Use the inferred version from the SQLAlchemy Bridge object, and set it as additional
# metadata on the channel data
source_channel_metadata.inferred_schema_version = source.schema_version
source.end()
# Adds an attribute `root_id` when `root_id` does not exist to match with
# the latest schema.
if not hasattr(source_channel_metadata, "root_id"):
setattr(
source_channel_metadata,
"root_id",
def handle(self, *args, **options):
self.stdout.write(format_line("Pos", "ID", "Name"))
self.stdout.write(format_line("---", "--", "----"))
for channel in ChannelMetadata.objects.all():
self.stdout.write(format_line(channel.order, channel.id, channel.name))
def post(self, request, *args, **kwargs):
try:
ids = request.data
assert isinstance(ids, list)
assert all(map(validate_uuid, ids))
except AssertionError:
raise ParseError("Array of ids not sent in body of request")
queryset = ChannelMetadata.objects.filter(root__available=True)
total_channels = queryset.count()
if len(ids) != total_channels:
raise ParseError(
"Expected {} ids, but only received {}".format(total_channels, len(ids))
)
if queryset.filter_by_uuids(ids).count() != len(ids):
raise ParseError(
"List of ids does not match the available channels on the server"
)
queryset.update(
order=Case(*(When(id=uuid, then=i + 1) for i, uuid in enumerate(ids)))
)
ContentCacheKey.update_cache_key()
return Response({})
def reorder_channels_upon_deletion(sender, instance=None, *args, **kwargs):
"""
For a given channel, decrement the order of all channels that come after this channel.
"""
if instance.order:
ChannelMetadata.objects.filter(order__gt=instance.order).update(
order=F("order") - 1
)