Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def manager_post_save_handler(sender, instance, created, **kwargs):
"""Run newly created (spawned) processes."""
if (
instance.status == Data.STATUS_DONE
or instance.status == Data.STATUS_ERROR
or created
):
# Run manager at the end of the potential transaction. Otherwise
# tasks are send to workers before transaction ends and therefore
# workers cannot access objects created inside transaction.
transaction.on_commit(lambda: commit_signal(instance.id))
- ``STATUS_DONE`` .. all dependencies have done status
- ``None`` .. other
"""
parents_statuses = set(
DataDependency.objects.filter(child=data, kind=DataDependency.KIND_IO)
.distinct("parent__status")
.values_list("parent__status", flat=True)
)
if not parents_statuses:
return Data.STATUS_DONE
if None in parents_statuses:
# Some parents have been deleted.
return Data.STATUS_ERROR
if Data.STATUS_ERROR in parents_statuses:
return Data.STATUS_ERROR
if len(parents_statuses) == 1 and Data.STATUS_DONE in parents_statuses:
return Data.STATUS_DONE
return None
data.status = Data.STATUS_DONE
data.save()
validate_data_object(data)
except ValidationError as exc:
logger.error(
__(
"Validation error when saving Data object of process '{}' (handle_referenced_files):\n\n{}",
data.process.slug,
traceback.format_exc(),
),
extra={"data_id": data_id},
)
data.refresh_from_db()
data.process_error.append(exc.message)
data.status = Data.STATUS_ERROR
with suppress(Exception):
data.save(update_fields=["process_error", "status"])
except Exception:
logger.exception(
"Exception while setting data status to DONE (handle_referenced_files).",
extra={"data_id": data_id,},
)
self._abort_processing(obj)
return
async_to_sync(self._send_reply)(
obj, {ExecutorProtocol.RESULT: ExecutorProtocol.RESULT_OK}
)
)
return
changeset = {
"process_progress": 100,
"finished": now(),
}
if spawning_failed:
changeset["status"] = Data.STATUS_ERROR
changeset["process_error"] = [
"Error while preparing spawned Data objects"
]
elif process_rc != 0:
changeset["status"] = Data.STATUS_ERROR
changeset["process_rc"] = process_rc
obj[ExecutorProtocol.UPDATE_CHANGESET] = changeset
self.handle_update(obj, internal_call=True)
# Notify the executor that we're done.
async_to_sync(self._send_reply)(
obj, {ExecutorProtocol.RESULT: ExecutorProtocol.RESULT_OK}
)
# Now nudge the main manager to perform final cleanup. This is
# needed even if there was no spawn baggage, since the manager
# may need to know when executors have finished, to keep count
# of them and manage synchronization.
async_to_sync(consumer.send_event)(
{
"""
parents_statuses = set(
DataDependency.objects.filter(child=data, kind=DataDependency.KIND_IO)
.distinct("parent__status")
.values_list("parent__status", flat=True)
)
if not parents_statuses:
return Data.STATUS_DONE
if None in parents_statuses:
# Some parents have been deleted.
return Data.STATUS_ERROR
if Data.STATUS_ERROR in parents_statuses:
return Data.STATUS_ERROR
if len(parents_statuses) == 1 and Data.STATUS_DONE in parents_statuses:
return Data.STATUS_DONE
return None
def get_status(self, collection):
"""Return status of the collection based on the status of data objects."""
if not hasattr(collection, "data_count"):
return None
if collection.data_count == 0:
return None
if collection.data_error_count:
return Data.STATUS_ERROR
if collection.data_uploading_count:
return Data.STATUS_UPLOADING
if collection.data_processing_count:
return Data.STATUS_PROCESSING
if collection.data_preparing_count:
return Data.STATUS_PREPARING
if collection.data_waiting_count:
return Data.STATUS_WAITING
if collection.data_resolving_count:
return Data.STATUS_RESOLVING
if collection.data_done_count == collection.data_count:
return Data.STATUS_DONE
logger.warning(
"Could not determine the status of a collection.",
extra={"collection": collection.__dict__},
"data", distinct=True, filter=Q(data__status=Data.STATUS_RESOLVING)
),
data_waiting_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_WAITING)
),
data_preparing_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_PREPARING)
),
data_processing_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_PROCESSING)
),
data_done_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_DONE)
),
data_error_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_ERROR)
),
)
return self.queryset
if changeset.get("status", None) == Data.STATUS_ERROR:
logger.error(
__(
"Error occured while running process '{}' (handle_update).",
d.process.slug,
),
extra={
"data_id": data_id,
"api_url": "{}{}".format(
getattr(settings, "RESOLWE_HOST_URL", ""),
reverse("resolwe-api:data-detail", kwargs={"pk": data_id}),
),
},
)
if d.status == Data.STATUS_ERROR:
changeset["status"] = Data.STATUS_ERROR
if not d.started:
changeset["started"] = now()
changeset["modified"] = now()
for key, val in changeset.items():
if key in ["process_error", "process_warning", "process_info"]:
# Trim process_* fields to not exceed max length of the database field.
for i, entry in enumerate(val):
max_length = Data._meta.get_field(key).base_field.max_length
if len(entry) > max_length:
val[i] = entry[: max_length - 3] + "..."
getattr(d, key).extend(val)
return
elif dep_status != Data.STATUS_DONE:
return
if data.process.run:
try:
execution_engine = data.process.run.get("language", None)
# Evaluation by the execution engine may spawn additional data objects and
# perform other queries on the database. Queries of all possible execution
# engines need to be audited for possibilities of deadlocks in case any
# additional locks are introduced. Currently, we only take an explicit lock on
# the currently processing object.
program = self.get_execution_engine(execution_engine).evaluate(data)
except (ExecutionError, InvalidEngineError) as error:
data.status = Data.STATUS_ERROR
data.process_error.append(
"Error in process script: {}".format(error)
)
data.save()
return
# Set allocated resources:
resource_limits = data.process.get_resource_limits()
data.process_memory = resource_limits["memory"]
data.process_cores = resource_limits["cores"]
else:
# If there is no run section, then we should not try to run anything. But the
# program must not be set to None as then the process will be stuck in waiting
# state.
program = ""
:param delete: If ``True``, then delete unreferenced files.
"""
try:
location = FileStorage.objects.get(id=location_id)
except FileStorage.DoesNotExist:
logger.warning(
"FileStorage location does not exist", extra={"location_id": location_id}
)
return
unreferenced_files = set()
purged_data = Data.objects.none()
referenced_by_data = location.data.exists()
if referenced_by_data:
if location.data.exclude(
status__in=[Data.STATUS_DONE, Data.STATUS_ERROR]
).exists():
return
# Perform cleanup.
purge_files_sets = list()
purged_data = location.data.all()
for data in purged_data:
purge_files_sets.append(
get_purge_files(
location.get_path(),
data.output,
data.process.output_schema,
data.descriptor,
getattr(data.descriptor_schema, "schema", []),
)
)