Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
elif data.get("id", None) is not None:
kwargs = {"id": data["id"]}
elif data.get("slug", None) is not None:
if self.root.instance:
# ``self.root.instance != None`` means that an instance is
# already present, so this is not "create" request.
self.fail("slug_not_allowed")
kwargs = {"slug": data["slug"]}
else:
self.fail("null", name=self.field_name)
user = getattr(self.context.get("request"), "user")
queryset = self.get_queryset()
permission = get_full_perm(self.write_permission, queryset.model)
try:
return get_objects_for_user(
user, permission, queryset.filter(**kwargs)
).latest("version")
except ObjectDoesNotExist:
# Differentiate between "user has no permission" and "object does not exist"
view_permission = get_full_perm("view", queryset.model)
if permission != view_permission:
try:
get_objects_for_user(
user, view_permission, queryset.filter(**kwargs)
).latest("version")
raise exceptions.PermissionDenied(
"You do not have {} permission for {}: {}.".format(
self.write_permission, self.model_name, data
)
)
except ObjectDoesNotExist:
else:
self.fail("null", name=self.field_name)
user = getattr(self.context.get("request"), "user")
queryset = self.get_queryset()
permission = get_full_perm(self.write_permission, queryset.model)
try:
return get_objects_for_user(
user, permission, queryset.filter(**kwargs)
).latest("version")
except ObjectDoesNotExist:
# Differentiate between "user has no permission" and "object does not exist"
view_permission = get_full_perm("view", queryset.model)
if permission != view_permission:
try:
get_objects_for_user(
user, view_permission, queryset.filter(**kwargs)
).latest("version")
raise exceptions.PermissionDenied(
"You do not have {} permission for {}: {}.".format(
self.write_permission, self.model_name, data
)
)
except ObjectDoesNotExist:
pass
self.fail(
"does_not_exist", value=smart_text(data), model_name=self.model_name
)
def _parents_children(self, request, queryset):
"""Process given queryset and return serialized objects."""
queryset = get_objects_for_user(request.user, "view_data", queryset)
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
# perform "get_or_create" if requested - return existing object
# if found
if kwargs.pop('get_or_create', False):
process_input = request.data.get('input', {})
# use default values if they are not given
for field_schema, fields, path in iterate_schema(process_input, process.input_schema):
if 'default' in field_schema and field_schema['name'] not in fields:
dict_dot(process_input, path, field_schema['default'])
checksum = get_data_checksum(process_input, process.slug, process.version)
data_qs = Data.objects.filter(
checksum=checksum,
process__persistence__in=[Process.PERSISTENCE_CACHED, Process.PERSISTENCE_TEMP],
)
data_qs = get_objects_for_user(request.user, 'view_data', data_qs)
if data_qs.exists():
data = data_qs.order_by('created').last()
serializer = self.get_serializer(data)
return Response(serializer.data)
# create the objects
resp = super(ResolweCreateDataModelMixin, self).create(request, *args, **kwargs)
# run manager
manager.communicate()
return resp
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
process = serializer.validated_data.get("process")
process_input = request.data.get("input", {})
fill_with_defaults(process_input, process.input_schema)
checksum = get_data_checksum(process_input, process.slug, process.version)
data_qs = Data.objects.filter(
checksum=checksum,
process__persistence__in=[
Process.PERSISTENCE_CACHED,
Process.PERSISTENCE_TEMP,
],
)
data_qs = get_objects_for_user(request.user, "view_data", data_qs)
if data_qs.exists():
data = data_qs.order_by("created").last()
serializer = self.get_serializer(data)
return Response(serializer.data)
def duplicate(self, request, *args, **kwargs):
"""Duplicate (make copy of) ``Entity`` models."""
if not request.user.is_authenticated:
raise exceptions.NotFound
inherit_collection = request.data.get("inherit_collection", False)
ids = self.get_ids(request.data)
queryset = get_objects_for_user(
request.user, "view_entity", Entity.objects.filter(id__in=ids)
)
actual_ids = queryset.values_list("id", flat=True)
missing_ids = list(set(ids) - set(actual_ids))
if missing_ids:
raise exceptions.ParseError(
"Entities with the following ids not found: {}".format(
", ".join(map(str, missing_ids))
)
)
duplicated = queryset.duplicate(
contributor=request.user, inherit_collection=inherit_collection
)
serializer = self.get_serializer(duplicated, many=True)
def duplicate(self, request, *args, **kwargs):
"""Duplicate (make copy of) ``Data`` objects."""
if not request.user.is_authenticated:
raise exceptions.NotFound
inherit_collection = request.data.get("inherit_collection", False)
ids = self.get_ids(request.data)
queryset = get_objects_for_user(
request.user, "view_data", Data.objects.filter(id__in=ids)
)
actual_ids = queryset.values_list("id", flat=True)
missing_ids = list(set(ids) - set(actual_ids))
if missing_ids:
raise exceptions.ParseError(
"Data objects with the following ids not found: {}".format(
", ".join(map(str, missing_ids))
)
)
duplicated = queryset.duplicate(
contributor=request.user, inherit_collection=inherit_collection,
)
serializer = self.get_serializer(duplicated, many=True)
def _get_entities(self, user, ids):
"""Return entities queryset based on provided entity ids."""
queryset = get_objects_for_user(
user, "view_entity", Entity.objects.filter(id__in=ids)
)
actual_ids = queryset.values_list("id", flat=True)
missing_ids = list(set(ids) - set(actual_ids))
if missing_ids:
raise exceptions.ParseError(
"Entities with the following ids not found: {}".format(
", ".join(map(str, missing_ids))
)
)
return queryset
app_label = queryset.model._meta.app_label
model_name = queryset.model._meta.model_name
kwargs = {}
if model_name == "storage":
model_name = "data"
kwargs["perms_filter"] = "data__pk__in"
if model_name == "relation":
model_name = "collection"
kwargs["perms_filter"] = "collection__pk__in"
permission = "{}.view_{}".format(app_label, model_name)
return get_objects_for_user(user, permission, queryset, **kwargs)