Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def migrate_data_references(self):
"""Migrate data references."""
def map_reference(reference):
"""Map references to new IDs."""
try:
return self.id_mapping['data'][reference]
except KeyError as error:
self.missing_data.add(error.message) # pylint: disable=no-member
return None
# Fix references in JSON documents in the second pass.
for new_id in self.id_mapping['data'].values():
data = Data.objects.get(pk=new_id)
for field_schema, fields in iterate_fields(data.input, data.process.input_schema):
if 'type' not in field_schema:
continue
name = field_schema['name']
value = fields[name]
if field_schema['type'].startswith('data:'):
fields[name] = map_reference(value)
elif field_schema['type'].startswith('list:data:'):
fields[name] = map(map_reference, value)
data.save()
def add_dependency(value):
"""Add parent Data dependency."""
try:
self.parents.add(Data.objects.get(pk=value)) # pylint: disable=no-member
except Data.DoesNotExist:
pass
.. code-block:: none
{
'command': 'missing_data_locations',
'data_id': [id of the :class:`~resolwe.flow.models.Data`
object]
}
"""
data_id = obj[ExecutorProtocol.DATA_ID]
logger.debug(
__("Handling get missing data location"),
extra={"data_id": data_id, "packet": obj},
)
try:
data = Data.objects.get(pk=data_id)
except Data.DoesNotExist:
logger.error(
"Data object does not exist (handle_missing_data_locations).",
extra={"data_id": data_id,},
)
self._abort_processing(obj)
return
missing_data = []
dependencies = (
Data.objects.filter(
children_dependency__child=data,
children_dependency__kind=DataDependency.KIND_IO,
)
.exclude(location__isnull=True)
.distinct()
def extend_settings(self, data_id, files, secrets):
"""Prevent processes requiring access to secrets from being run."""
process = Data.objects.get(pk=data_id).process
if process.requirements.get("resources", {}).get("secrets", False):
raise PermissionDenied(
"Process which requires access to secrets cannot be run using the local executor"
)
return super().extend_settings(data_id, files, secrets)
_hydrate_values(output, data.process.output_schema, data)
# _hydrate_values(static, data.static_schema, data)
output["__id"] = data.id
output["__type"] = data.process.type
fields[name] = output
elif field_schema['type'].startswith('list:data:'):
outputs = []
for val in value:
# if re.match('^[0-9a-fA-F]{24}$', str(val)) is None:
# print "ERROR: data:<...> value in {}, type \"{}\" not ObjectId but {}.".format(
# name, field_schema['type'], val)
if val is None:
continue
data = Data.objects.get(id=val)
output = copy.deepcopy(data.output)
# static = Data.static.to_python(data.static)
if hydrate_values:
_hydrate_values(output, data.process.output_schema, data)
# _hydrate_values(static, data.static_schema, data)
output["__id"] = data.id
output["__type"] = data.process.type
outputs.append(output)
fields[name] = outputs