Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
django_settings.update(kwargs)
files[ExecutorFiles.DJANGO_SETTINGS] = django_settings
# Add scheduling classes.
files[ExecutorFiles.PROCESS_META] = {
k: getattr(Process, k)
for k in dir(Process)
if k.startswith("SCHEDULING_CLASS_")
and isinstance(getattr(Process, k), str)
}
# Add Data status constants.
files[ExecutorFiles.DATA_META] = {
k: getattr(Data, k)
for k in dir(Data)
if k.startswith("STATUS_") and isinstance(getattr(Data, k), str)
}
# Prepare storage connectors settings and secrets.
connectors_settings = copy.deepcopy(STORAGE_CONNECTORS)
# Local connector in executor in always named 'local'.
connectors_settings["local"] = connectors_settings.pop(STORAGE_LOCAL_CONNECTOR)
for connector_settings in connectors_settings.values():
# Fix class name for inclusion in the executor.
klass = connector_settings["connector"]
klass = "executors." + klass.rsplit(".storage.")[-1]
connector_settings["connector"] = klass
connector_config = connector_settings["config"]
# Prepare credentials for executor.
if "credentials" in connector_config:
src_credentials = connector_config["credentials"]
base_credentials_name = os.path.basename(src_credentials)
return
# Set allocated resources:
resource_limits = data.process.get_resource_limits()
data.process_memory = resource_limits["memory"]
data.process_cores = resource_limits["cores"]
else:
# If there is no run section, then we should not try to run anything. But the
# program must not be set to None as then the process will be stuck in waiting
# state.
program = ""
if data.status != Data.STATUS_DONE:
# The data object may already be marked as done by the execution engine. In this
# case we must not revert the status to STATUS_WAITING.
data.status = Data.STATUS_WAITING
data.save(render_name=True)
# Actually run the object only if there was nothing with the transaction.
transaction.on_commit(
# Make sure the closure gets the right values here, since they're
# changed in the loop.
lambda d=data, p=program: self._data_execute(d, p, executor)
)
}
}
:param internal_call: If ``True``, this is an internal delegate
call, so a reply to the executor won't be sent.
"""
data_id = obj[ExecutorProtocol.DATA_ID]
changeset = obj[ExecutorProtocol.UPDATE_CHANGESET]
if not internal_call:
logger.debug(
__("Handling update for Data with id {} (handle_update).", data_id),
extra={"data_id": data_id, "packet": obj},
)
try:
d = Data.objects.get(pk=data_id)
except Data.DoesNotExist:
logger.warning(
"Data object does not exist (handle_update).",
extra={"data_id": data_id,},
)
if not internal_call:
async_to_sync(self._send_reply)(
obj, {ExecutorProtocol.RESULT: ExecutorProtocol.RESULT_ERROR}
)
async_to_sync(consumer.send_event)(
{
WorkerProtocol.COMMAND: WorkerProtocol.ABORT,
WorkerProtocol.DATA_ID: obj[ExecutorProtocol.DATA_ID],
WorkerProtocol.FINISH_COMMUNICATE_EXTRA: {
"executor": getattr(settings, "FLOW_EXECUTOR", {}).get(
# Create dummy processor if there is no other version
dummy_name = 'Dummy processor of type {}'.format(data[u'type'])
try:
process = Process.objects.get(name=dummy_name)
except Process.DoesNotExist:
process = Process.objects.create(
name=dummy_name,
slug='non-existent',
contributor=get_user_model().objects.filter(is_superuser=True).first(),
type=data[u'type'],
category='data:non-existent',
run={'script': {'gen-require common\ngen-error "This processor is not intendent to be run."'}},
)
# DATA #########################################################
new = Data()
new.name = data.get(u'static', {}).get(u'name', '')
if len(new.name) > 100:
self.long_names.append(new.name)
new.name = new.name[:97] + '...'
new.status = self.status_dict[data[u'status']]
new.process = process
new.contributor = contributor
new.input = data[u'input'] if u'input' in data else {}
new.output = data[u'output']
new.descriptor_schema = descriptor_schema
new.descriptor = descriptor
new.checksum = data.get(u'checksum', '')
# XXX: Django will change this on create
new.created = data[u'date_created']
# XXX: Django will change this on save
new.modified = data[u'date_modified']
def process_data_object(data):
"""Process a single data object."""
# Lock for update. Note that we want this transaction to be as short as possible in
# order to reduce contention and avoid deadlocks. This is why we do not lock all
# resolving objects for update, but instead only lock one object at a time. This
# allows managers running in parallel to process different objects.
data = Data.objects.select_for_update().get(pk=data.pk)
if data.status != Data.STATUS_RESOLVING:
# The object might have already been processed while waiting for the lock to be
# obtained. In this case, skip the object.
return
dep_status = dependency_status(data)
if dep_status == Data.STATUS_ERROR:
data.status = Data.STATUS_ERROR
data.process_error.append("One or more inputs have status ERROR")
data.process_rc = 1
data.save()
return
elif dep_status != Data.STATUS_DONE:
return
"data", distinct=True, filter=Q(data__status=Data.STATUS_UPLOADING)
),
data_resolving_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_RESOLVING)
),
data_waiting_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_WAITING)
),
data_preparing_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_PREPARING)
),
data_processing_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_PROCESSING)
),
data_done_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_DONE)
),
data_error_count=Count(
"data", distinct=True, filter=Q(data__status=Data.STATUS_ERROR)
),
)
return self.queryset
try:
inputs = data.input.copy()
hydrate_input_references(inputs, data.process.input_schema)
hydrate_input_uploads(inputs, data.process.input_schema)
inputs['proc'] = {
'data_id': data.id,
'data_dir': settings.FLOW_EXECUTOR['DATA_DIR'],
}
script_template = data.process.run.get('bash', '')
script = render_template(script_template, template.Context(inputs))
except template.TemplateSyntaxError as ex:
data.status = Data.STATUS_ERROR
data.process_error.append('Error in process script: {}'.format(ex))
data.save()
return None
return script
# same time, the lock for the current data object is released.
except Exception as error:
logger.exception(
__(
"Unhandled exception in _data_scan while processing data object {}.",
data.pk,
)
)
# Unhandled error while processing a data object. We must set its
# status to STATUS_ERROR to prevent the object from being retried
# on next _data_scan run. We must perform this operation without
# using the Django ORM as using the ORM may be the reason the error
# occurred in the first place.
error_msg = "Internal error: {}".format(error)
process_error_field = Data._meta.get_field("process_error")
max_length = process_error_field.base_field.max_length
if len(error_msg) > max_length:
error_msg = error_msg[: max_length - 3] + "..."
try:
with connection.cursor() as cursor:
cursor.execute(
"""
UPDATE {table}
SET
status = %(status)s,
process_error = process_error || (%(error)s)::varchar[]
WHERE id = %(id)s
""".format(
table=Data._meta.db_table
),