Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
:param location_id: Id of the
:class:`~resolwe.storage.models.FileStorage` model that data
objects reference to.
:param delete: If ``True``, then delete unreferenced files.
"""
try:
location = FileStorage.objects.get(id=location_id)
except FileStorage.DoesNotExist:
logger.warning(
"FileStorage location does not exist", extra={"location_id": location_id}
)
return
unreferenced_files = set()
purged_data = Data.objects.none()
referenced_by_data = location.data.exists()
if referenced_by_data:
if location.data.exclude(
status__in=[Data.STATUS_DONE, Data.STATUS_ERROR]
).exists():
return
# Perform cleanup.
purge_files_sets = list()
purged_data = location.data.all()
for data in purged_data:
purge_files_sets.append(
get_purge_files(
location.get_path(),
data.output,
data.process.output_schema,
def _get_data_attr(data, attr):
"""Get data object field."""
if isinstance(data, dict):
# `Data` object's id is hydrated as `__id` in expression engine
data = data["__id"]
data_obj = Data.objects.get(id=data)
return getattr(data_obj, attr)
def migrate_storage(self, storage):
"""Migrate storage."""
if str(storage[u'_id']) not in self.storage_index:
self.unreferenced_storages.append(storage[u'_id'])
return 1
data_id = self.storage_index[str(storage[u'_id'])]['id']
data_path = self.storage_index[str(storage[u'_id'])]['path']
data = Data.objects.get(pk=data_id)
new = Storage()
new.name = 'data_{}_storage'.format(data_id)
new.data = data
new.json = storage[u'json']
new.contributor = self.get_contributor(storage[u'author_id'])
# XXX: Django will change this on create
new.created = storage[u'date_created']
# XXX: Django will change this on save
new.modified = storage[u'date_modified']
new.save()
dict_dot(data.output, data_path, new.pk)
data.save()
self.id_mapping['storage'][str(storage[u'_id'])] = new.pk
[keys to be changed]
}
}
:param internal_call: If ``True``, this is an internal delegate
call, so a reply to the executor won't be sent.
"""
data_id = obj[ExecutorProtocol.DATA_ID]
changeset = obj[ExecutorProtocol.UPDATE_CHANGESET]
if not internal_call:
logger.debug(
__("Handling update for Data with id {} (handle_update).", data_id),
extra={"data_id": data_id, "packet": obj},
)
try:
d = Data.objects.get(pk=data_id)
except Data.DoesNotExist:
logger.warning(
"Data object does not exist (handle_update).",
extra={"data_id": data_id,},
)
if not internal_call:
async_to_sync(self._send_reply)(
obj, {ExecutorProtocol.RESULT: ExecutorProtocol.RESULT_ERROR}
)
async_to_sync(consumer.send_event)(
{
WorkerProtocol.COMMAND: WorkerProtocol.ABORT,
WorkerProtocol.DATA_ID: obj[ExecutorProtocol.DATA_ID],
WorkerProtocol.FINISH_COMMUNICATE_EXTRA: {
def input_(data, field_path):
"""Return a hydrated value of the ``input`` field."""
data_obj = Data.objects.get(id=data["__id"])
inputs = copy.deepcopy(data_obj.input)
# XXX: Optimize by hydrating only the required field (major refactoring).
hydrate_input_references(inputs, data_obj.process.input_schema)
hydrate_input_uploads(inputs, data_obj.process.input_schema)
return dict_dot(inputs, field_path)
def _get_data_attr(data, attr):
"""Get data object field."""
if isinstance(data, dict):
# `Data` object's id is hydrated as `__id` in expression engine
data = data['__id']
data_obj = Data.objects.get(id=data)
return getattr(data_obj, attr)
filter_class = ProcessFilter
ordering_fields = ('id', 'created', 'modified', 'name', 'version')
ordering = ('id',)
class DataViewSet(ResolweCreateDataModelMixin,
mixins.RetrieveModelMixin,
ResolweUpdateModelMixin,
mixins.DestroyModelMixin,
mixins.ListModelMixin,
ResolwePermissionsMixin,
ResolweCheckSlugMixin,
viewsets.GenericViewSet):
"""API view for :class:`Data` objects."""
queryset = Data.objects.all().prefetch_related('process', 'descriptor_schema', 'contributor')
serializer_class = DataSerializer
permission_classes = (permissions_cls,)
filter_class = DataFilter
ordering_fields = ('id', 'created', 'modified', 'started', 'finished', 'name')
ordering = ('id',)
class DescriptorSchemaViewSet(mixins.RetrieveModelMixin,
mixins.ListModelMixin,
ResolwePermissionsMixin,
viewsets.GenericViewSet):
"""API view for :class:`DescriptorSchema` objects."""
queryset = DescriptorSchema.objects.all().prefetch_related('contributor')
serializer_class = DescriptorSchemaSerializer
permission_classes = (permissions_cls,)
# Process all input variables.
step_input = step.get("input", {})
if not isinstance(step_input, dict):
raise ExecutionError(
'Incorrect definition of step "{}", input must be a dictionary.'.format(
step_id
)
)
data_input = self._evaluate_expressions(
expression_engine, step_id, step_input, context
)
# Create the data object.
data_object = Data.objects.create(
process=process,
contributor=data.contributor,
tags=data.tags,
input=data_input,
collection=data.collection,
subprocess_parent=data,
)
context["steps"][step_id] = data_object.pk
# Immediately set our status to done and output all data object identifiers.
data.output = {
"steps": list(context["steps"].values()),
}
data.status = Data.STATUS_DONE