Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def submit(self, data, runtime_dir, argv):
"""Run process.
For details, see
:meth:`~resolwe.flow.managers.workload_connectors.base.BaseConnector.submit`.
"""
queue = "ordinary"
if data.process.scheduling_class == Process.SCHEDULING_CLASS_INTERACTIVE:
queue = "hipri"
logger.debug(
__(
"Connector '{}' running for Data with id {} ({}) in celery queue {}, EAGER is {}.",
self.__class__.__module__,
data.id,
repr(argv),
queue,
getattr(settings, "CELERY_ALWAYS_EAGER", None),
)
)
celery_run.apply_async((data.id, runtime_dir, argv), queue=queue)
getattr(d, key).extend(val)
elif key != "output":
setattr(d, key, val)
if "output" in changeset:
if not isinstance(d.output, dict):
d.output = {}
for key, val in changeset["output"].items():
dict_dot(d.output, key, val)
try:
d.save(update_fields=list(changeset.keys()))
except ValidationError as exc:
logger.error(
__(
"Validation error when saving Data object of process '{}' (handle_update):\n\n{}",
d.process.slug,
traceback.format_exc(),
),
extra={"data_id": data_id},
)
d.refresh_from_db()
d.process_error.append(exc.message)
d.status = Data.STATUS_ERROR
with suppress(Exception):
d.save(update_fields=["process_error", "status"])
except Exception:
logger.error(
__(
"Error when saving Data object of process '{}' (handle_update):\n\n{}",
def _prepare_data_dir(self, data):
"""Prepare destination directory where the data will live.
:param data: The :class:`~resolwe.flow.models.Data` object for
which to prepare the private execution directory.
:return: The prepared data directory path.
:rtype: str
"""
logger.debug(__("Preparing data directory for Data with id {}.", data.id))
with transaction.atomic():
file_storage = FileStorage.objects.create()
# Create StorageLocation with default connector.
# We must also specify status since it is Uploading by default.
data_location = StorageLocation.objects.create(
file_storage=file_storage,
url=str(file_storage.id),
status=StorageLocation.STATUS_PREPARING,
connector_name=STORAGE_LOCAL_CONNECTOR,
)
file_storage.data.add(data)
# Reference 'special' files.
for file_ in referenced_files(data, include_descriptor=False):
referenced_path = ReferencedPath.objects.create(path=file_)
def _prepare_executor(self, data, executor):
"""Copy executor sources into the destination directory.
:param data: The :class:`~resolwe.flow.models.Data` object being
prepared for.
:param executor: The fully qualified name of the executor that
is to be used for this data object.
:return: Tuple containing the relative fully qualified name of
the executor class ('relative' to how the executor will be
run) and the path to the directory where the executor will
be deployed.
:rtype: (str, str)
"""
logger.debug(__("Preparing executor for Data with id {}", data.id))
# Both of these imports are here only to get the packages' paths.
import resolwe.flow.executors as executor_package
exec_dir = os.path.dirname(inspect.getsourcefile(executor_package))
dest_dir = self._get_per_data_dir(
"RUNTIME_DIR", data.location.default_storage_location.subpath
)
dest_package_dir = os.path.join(dest_dir, "executors")
shutil.copytree(exec_dir, dest_package_dir)
dir_mode = self.settings_actual.get("FLOW_EXECUTOR", {}).get(
"RUNTIME_DIR_MODE", 0o755
)
os.chmod(dest_dir, dir_mode)
class_name = executor.rpartition(".executors.")[-1]
intersected_files = (
set.intersection(*purge_files_sets) if purge_files_sets else set()
)
unreferenced_files.update(intersected_files)
else:
# Remove data directory.
unreferenced_files.add(location.get_path())
unreferenced_files.add(
os.path.join(settings.FLOW_EXECUTOR["RUNTIME_DIR"], location.subpath)
)
if verbosity >= 1:
# Print unreferenced files
if unreferenced_files:
logger.info(
__(
"Unreferenced files for location id {} ({}):",
location_id,
len(unreferenced_files),
)
)
for name in unreferenced_files:
logger.info(__(" {}", name))
else:
logger.info(__("No unreferenced files for location id {}", location_id))
# Go through unreferenced files and delete them.
if delete:
for name in unreferenced_files:
if os.path.isfile(name) or os.path.islink(name):
os.remove(name)
elif os.path.isdir(name):
'command': 'update',
'data_id': [id of the :class:`~resolwe.flow.models.Data`
object this command changes],
'changeset': {
[keys to be changed]
}
}
:param internal_call: If ``True``, this is an internal delegate
call, so a reply to the executor won't be sent.
"""
data_id = obj[ExecutorProtocol.DATA_ID]
changeset = obj[ExecutorProtocol.UPDATE_CHANGESET]
if not internal_call:
logger.debug(
__("Handling update for Data with id {} (handle_update).", data_id),
extra={"data_id": data_id, "packet": obj},
)
try:
d = Data.objects.get(pk=data_id)
except Data.DoesNotExist:
logger.warning(
"Data object does not exist (handle_update).",
extra={"data_id": data_id,},
)
if not internal_call:
async_to_sync(self._send_reply)(
obj, {ExecutorProtocol.RESULT: ExecutorProtocol.RESULT_ERROR}
)
async_to_sync(consumer.send_event)(
"error": [error_msg],
"id": data.pk,
},
)
except Exception:
# If object's state cannot be changed due to some database-related
# issue, at least skip the object for this run.
logger.exception(
__(
"Unhandled exception in _data_scan while trying to emit error for {}.",
data.pk,
)
)
except IntegrityError as exp:
logger.error(__("IntegrityError in manager {}", exp))
return
{
WorkerProtocol.COMMAND: WorkerProtocol.ABORT,
WorkerProtocol.DATA_ID: obj[ExecutorProtocol.DATA_ID],
WorkerProtocol.FINISH_COMMUNICATE_EXTRA: {
"executor": getattr(settings, "FLOW_EXECUTOR", {}).get(
"NAME", "resolwe.flow.executors.local"
),
},
}
)
data_id = obj[ExecutorProtocol.DATA_ID]
annotations = obj[ExecutorProtocol.ANNOTATIONS]
logger.debug(
__("Handling annotate for Data with id {} (handle_annotate).", data_id),
extra={"data_id": data_id, "packet": obj},
)
try:
d = Data.objects.get(pk=data_id)
except Data.DoesNotExist:
logger.warning(
"Data object does not exist (handle_annotate).",
extra={"data_id": data_id,},
)
report_failure()
return
if d.entity is None:
logger.error(
__(
"No entity to annotate for process '{}' (handle_annotate):\n\n{}",
snapshot = self._make_stats()
try:
serialized = json.dumps(snapshot)
await self._call_redis(
aioredis.Redis.set, state.MANAGER_LISTENER_STATS, serialized
)
await self._call_redis(
aioredis.Redis.expire, state.MANAGER_LISTENER_STATS, 3600
)
except TypeError:
logger.error(
__("Listener can't serialize statistics:\n\n{}", traceback.format_exc())
)
except aioredis.RedisError:
logger.error(
__(
"Listener can't store updated statistics:\n\n{}",
traceback.format_exc(),
)