Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def create(self, request, *args, **kwargs):
"""Create a resource."""
user = request.user
if not user.is_authenticated():
raise exceptions.NotFound
ds_slug = request.data.get('descriptor_schema', None)
if ds_slug:
ds_query = DescriptorSchema.objects.filter(slug=ds_slug)
ds_query = get_objects_for_user(request.user, 'view_descriptorschema', ds_query)
try:
request.data['descriptor_schema'] = ds_query.latest().pk
except DescriptorSchema.DoesNotExist:
return Response(
{'descriptor_schema': [
'Invalid descriptor_schema slug "{}" - object does not exist.'.format(ds_slug)]},
status=status.HTTP_400_BAD_REQUEST)
request.data['contributor'] = user.pk
try:
return super(ResolweCreateModelMixin, self).create(request, *args, **kwargs)
except IntegrityError as ex:
return Response({u'error': str(ex)}, status=status.HTTP_409_CONFLICT)
from resolwe.rest.fields import ProjectableJSONField
from .base import ResolweBaseSerializer
from .descriptor import DescriptorSchemaSerializer
from .fields import DictRelatedField
logger = logging.getLogger(__name__)
class BaseCollectionSerializer(ResolweBaseSerializer):
"""Base serializer for Collection objects."""
settings = ProjectableJSONField(required=False)
descriptor = ProjectableJSONField(required=False)
descriptor_schema = DictRelatedField(
queryset=DescriptorSchema.objects.all(),
serializer=DescriptorSchemaSerializer,
allow_null=True,
required=False,
)
data_count = serializers.IntegerField(required=False)
status = serializers.SerializerMethodField(required=False)
def get_status(self, collection):
"""Return status of the collection based on the status of data objects."""
if not hasattr(collection, "data_count"):
return None
if collection.data_count == 0:
return None
if collection.data_error_count:
return Data.STATUS_ERROR
from django.db.models import Prefetch
from rest_framework import exceptions, permissions, status, viewsets
from rest_framework.response import Response
from resolwe.flow.filters import RelationFilter
from resolwe.flow.models import Collection, DescriptorSchema, Relation
from resolwe.flow.serializers import RelationSerializer
from .mixins import ResolweCreateModelMixin
class RelationViewSet(ResolweCreateModelMixin, viewsets.ModelViewSet):
"""API view for :class:`Relation` objects."""
qs_collection_ds = DescriptorSchema.objects.select_related("contributor")
qs_collection = Collection.objects.select_related("contributor")
qs_collection = qs_collection.prefetch_related(
Prefetch("descriptor_schema", queryset=qs_collection_ds),
)
queryset = (
Relation.objects.all()
.select_related("contributor", "type")
.prefetch_related(
Prefetch("collection", queryset=qs_collection), "relationpartition_set"
)
)
serializer_class = RelationSerializer
permission_classes = (permissions.IsAuthenticated,)
filterset_class = RelationFilter
ordering_fields = ("id", "created", "modified")
from resolwe.permissions.utils import update_permission
from .collection import BaseCollectionViewSet
from .utils import get_collection_for_user
class EntityViewSet(BaseCollectionViewSet):
"""API view for entities."""
qs_collection_ds = DescriptorSchema.objects.select_related("contributor")
qs_collection = Collection.objects.select_related("contributor")
qs_collection = qs_collection.prefetch_related(
"data", "entity_set", Prefetch("descriptor_schema", queryset=qs_collection_ds),
)
qs_descriptor_schema = DescriptorSchema.objects.select_related("contributor")
queryset = Entity.objects.select_related("contributor").prefetch_related(
"data",
Prefetch("collection", queryset=qs_collection),
Prefetch("descriptor_schema", queryset=qs_descriptor_schema),
)
serializer_class = EntitySerializer
filter_class = EntityFilter
def _get_entities(self, user, ids):
"""Return entities queryset based on provided entity ids."""
queryset = get_objects_for_user(
user, "view_entity", Entity.objects.filter(id__in=ids)
)
actual_ids = queryset.values_list("id", flat=True)
missing_ids = list(set(ids) - set(actual_ids))
* Add object to existing `Entity`, if all parents that are part
of it (but not necessary all parents), are part of the same
`Entity`
* If parents belong to different `Entities` or do not belong to
any `Entity`, create new `Entity`
"""
ds_slug = self.process.flow_collection # pylint: disable=no-member
if ds_slug:
entity_query = Entity.objects.filter(data__in=self.parents.all()).distinct() # pylint: disable=no-member
if entity_query.count() == 1:
entity = entity_query.first()
else:
descriptor_schema = DescriptorSchema.objects.get(slug=ds_slug)
entity = Entity.objects.create(
contributor=self.contributor,
descriptor_schema=descriptor_schema,
name=self.name,
)
for permission in list(zip(*entity._meta.permissions))[0]: # pylint: disable=protected-access
assign_perm(permission, entity.contributor, entity)
for collection in Collection.objects.filter(data=self):
entity.collections.add(collection)
entity.data.add(self)
from .entity import EntitySerializer
from .fields import DictRelatedField
from .process import ProcessSerializer
class DataSerializer(ResolweBaseSerializer):
"""Serializer for Data objects."""
input = ProjectableJSONField(required=False)
output = ProjectableJSONField(required=False)
descriptor = ProjectableJSONField(required=False)
process = DictRelatedField(
queryset=Process.objects.all(), serializer=ProcessSerializer
)
descriptor_schema = DictRelatedField(
queryset=DescriptorSchema.objects.all(),
serializer=DescriptorSchemaSerializer,
allow_null=True,
required=False,
)
collection = DictRelatedField(
queryset=Collection.objects.all(),
serializer=CollectionSerializer,
allow_null=True,
required=False,
write_permission="edit",
)
entity = DictRelatedField(
queryset=Entity.objects.all(),
serializer=EntitySerializer,
allow_null=True,
required=False,
slug=slug, version=version
)
if descriptor_query.exists():
if not force:
if verbosity > 0:
self.stdout.write(
"Skip descriptor schema {}: same version installed".format(
slug
)
)
continue
descriptor_query.update(**descriptor_schema)
log_descriptors.append("Updated {}".format(slug))
else:
descriptor = DescriptorSchema.objects.create(
contributor=user, **descriptor_schema
)
assign_contributor_permissions(descriptor)
if previous_descriptor:
copy_permissions(previous_descriptor, descriptor)
log_descriptors.append("Inserted {}".format(slug))
if log_descriptors and verbosity > 0:
self.stdout.write("Descriptor schemas Updates:")
for log in log_descriptors:
self.stdout.write(" {}".format(log))
ResolwePermissionsMixin,
ResolweCheckSlugMixin,
ParametersMixin,
viewsets.GenericViewSet,
):
"""API view for :class:`Data` objects."""
qs_collection_ds = DescriptorSchema.objects.select_related("contributor")
qs_collection = Collection.objects.select_related("contributor")
qs_collection = qs_collection.prefetch_related(
"data", "entity_set", Prefetch("descriptor_schema", queryset=qs_collection_ds),
)
qs_descriptor_schema = DescriptorSchema.objects.select_related("contributor")
qs_entity_col_ds = DescriptorSchema.objects.select_related("contributor")
qs_entity_col = Collection.objects.select_related("contributor")
qs_entity_col = qs_entity_col.prefetch_related(
"data", "entity_set", Prefetch("descriptor_schema", queryset=qs_entity_col_ds),
)
qs_entity_ds = DescriptorSchema.objects.select_related("contributor")
qs_entity = Entity.objects.select_related("contributor")
qs_entity = qs_entity.prefetch_related(
"data",
Prefetch("collection", queryset=qs_entity_col),
Prefetch("descriptor_schema", queryset=qs_entity_ds),
)
qs_process = Process.objects.select_related("contributor")
queryset = Data.objects.select_related("contributor").prefetch_related(
Prefetch("collection", queryset=qs_collection),
from resolwe.flow.filters import DescriptorSchemaFilter
from resolwe.flow.models import DescriptorSchema
from resolwe.flow.serializers import DescriptorSchemaSerializer
from resolwe.permissions.loader import get_permissions_class
from resolwe.permissions.mixins import ResolwePermissionsMixin
class DescriptorSchemaViewSet(
mixins.RetrieveModelMixin,
mixins.ListModelMixin,
ResolwePermissionsMixin,
viewsets.GenericViewSet,
):
"""API view for :class:`DescriptorSchema` objects."""
queryset = DescriptorSchema.objects.all().select_related("contributor")
serializer_class = DescriptorSchemaSerializer
permission_classes = (get_permissions_class(),)
filterset_class = DescriptorSchemaFilter
ordering_fields = ("id", "created", "modified", "name", "version")
ordering = ("id",)
def get_fields(self):
"""Dynamically adapt fields based on the current request."""
fields = super().get_fields()
if self.request.method == "GET":
fields['descriptor_schema'] = DescriptorSchemaSerializer(required=False)
else:
fields['descriptor_schema'] = serializers.PrimaryKeyRelatedField(
queryset=DescriptorSchema.objects.all(), required=False
)
return fields