Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update(self, request, *args, **kwargs):
"""Update a resource."""
ds_slug = request.data.get('descriptor_schema', None)
if ds_slug:
ds_query = DescriptorSchema.objects.filter(slug=ds_slug)
ds_query = get_objects_for_user(request.user, 'view_descriptorschema', ds_query)
try:
request.data['descriptor_schema'] = ds_query.latest().pk
except DescriptorSchema.DoesNotExist:
return Response(
{'descriptor_schema': [
'Invalid descriptor_schema slug "{}" - object does not exist.'.format(ds_slug)]},
status=status.HTTP_400_BAD_REQUEST)
return super(ResolweUpdateModelMixin, self).update(request, *args, **kwargs)
slug = descriptor_schema["slug"]
version = descriptor_schema.get("version", "0.0.0")
int_version = convert_version_string_to_int(version, VERSION_NUMBER_BITS)
# `latest version` is returned as `int` so it has to be compared to `int_version`
latest_version = DescriptorSchema.objects.filter(slug=slug).aggregate(
Max("version")
)["version__max"]
if latest_version is not None and latest_version > int_version:
self.stderr.write(
"Skip descriptor schema {}: newer version installed".format(slug)
)
continue
previous_descriptor_qs = DescriptorSchema.objects.filter(slug=slug)
if previous_descriptor_qs.exists():
previous_descriptor = previous_descriptor_qs.latest()
else:
previous_descriptor = None
descriptor_query = DescriptorSchema.objects.filter(
slug=slug, version=version
)
if descriptor_query.exists():
if not force:
if verbosity > 0:
self.stdout.write(
"Skip descriptor schema {}: same version installed".format(
slug
)
)
class BaseCollectionViewSet(
ResolweCreateModelMixin,
mixins.RetrieveModelMixin,
ResolweUpdateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
ResolwePermissionsMixin,
ResolweCheckSlugMixin,
ParametersMixin,
viewsets.GenericViewSet,
):
"""Base API view for :class:`Collection` objects."""
qs_descriptor_schema = DescriptorSchema.objects.select_related("contributor")
queryset = Collection.objects.select_related("contributor").prefetch_related(
Prefetch("descriptor_schema", queryset=qs_descriptor_schema),
)
filter_class = CollectionFilter
permission_classes = (get_permissions_class(),)
ordering_fields = (
"contributor",
"contributor__first_name",
"contributor__last_name",
"created",
"id",
"modified",
"name",
def migrate_data(self, data):
"""Migrate data."""
contributor = self.get_contributor(data[u'author_id'])
# DESCRIPTOR SCHEMA ############################################
ds_fields = []
ds_fields.extend(data.get(u'static_schema', []))
ds_fields.extend(data.get(u'var_template', []))
ds_fields.sort(key=lambda d: d[u'name'])
ds_fields_dumped = json.dumps(ds_fields)
if ds_fields_dumped in self.descriptor_schema_index:
descriptor_schema = self.descriptor_schema_index[ds_fields_dumped]
else:
descriptor_schema = DescriptorSchema(schema=ds_fields)
descriptor_schema.name = 'data_{}_descriptor'.format(data[u'_id'])
descriptor_schema.contributor = contributor
descriptor_schema.save()
self.descriptor_schema_index[ds_fields_dumped] = descriptor_schema
descriptor = {}
descriptor.update(data.get(u'static', {}))
descriptor.update(data.get(u'var', {}))
# PROCESS ######################################################
if u'processor_version' not in data:
data[u'processor_version'] = '0.0.0'
process_slug = self.process_slug(data[u'processor_name'])
process_version = data[u'processor_version']
class Meta(BaseModel.Meta):
"""BaseCollection Meta options."""
abstract = True
#: detailed description
description = models.TextField(blank=True)
settings = JSONField(default=dict)
public_processes = models.ManyToManyField(Process)
data = models.ManyToManyField(Data)
#: collection descriptor schema
descriptor_schema = models.ForeignKey(DescriptorSchema, blank=True, null=True, on_delete=models.PROTECT)
#: collection descriptor
descriptor = JSONField(default=dict)
class Collection(BaseCollection):
"""Postgres model for storing a collection."""
class Meta(BaseCollection.Meta):
"""Collection Meta options."""
permissions = (
("view_collection", "Can view collection"),
("edit_collection", "Can edit collection"),
("share_collection", "Can share collection"),
("download_collection", "Can download files from collection"),
from rest_framework.response import Response
from resolwe.flow.filters import EntityFilter
from resolwe.flow.models import Collection, DescriptorSchema, Entity
from resolwe.flow.serializers import EntitySerializer
from resolwe.permissions.shortcuts import get_objects_for_user
from resolwe.permissions.utils import update_permission
from .collection import BaseCollectionViewSet
from .utils import get_collection_for_user
class EntityViewSet(BaseCollectionViewSet):
"""API view for entities."""
qs_collection_ds = DescriptorSchema.objects.select_related("contributor")
qs_collection = Collection.objects.select_related("contributor")
qs_collection = qs_collection.prefetch_related(
"data", "entity_set", Prefetch("descriptor_schema", queryset=qs_collection_ds),
)
qs_descriptor_schema = DescriptorSchema.objects.select_related("contributor")
queryset = Entity.objects.select_related("contributor").prefetch_related(
"data",
Prefetch("collection", queryset=qs_collection),
Prefetch("descriptor_schema", queryset=qs_descriptor_schema),
)
serializer_class = EntitySerializer
filter_class = EntityFilter
def _get_entities(self, user, ids):