Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# asynchronously with respect to the main Django code
# here; the manager can get nudged from elsewhere.
with transaction.atomic():
parent_data = Data.objects.get(pk=data_id)
# Spawn processes.
for d in obj[ExecutorProtocol.FINISH_SPAWN_PROCESSES]:
d["contributor"] = parent_data.contributor
d["process"] = Process.objects.filter(
slug=d["process"]
).latest()
d["tags"] = parent_data.tags
d["collection"] = parent_data.collection
d["subprocess_parent"] = parent_data
for field_schema, fields in iterate_fields(
d.get("input", {}), d["process"].input_schema
):
type_ = field_schema["type"]
name = field_schema["name"]
value = fields[name]
if type_ == "basic:file:":
fields[name] = self.hydrate_spawned_files(
exported_files_mapper, value, data_id
)
elif type_ == "list:basic:file:":
fields[name] = [
self.hydrate_spawned_files(
exported_files_mapper, fn, data_id
)
for fn in value
def hydrate_input_references(input_, input_schema, hydrate_values=True):
"""Hydrate ``input_`` with linked data.
Find fields with complex data:<...> types in ``input_``.
Assign an output of corresponding data object to those fields.
"""
from .data import Data # prevent circular import
for field_schema, fields in iterate_fields(input_, input_schema):
name = field_schema["name"]
value = fields[name]
if "type" in field_schema:
if field_schema["type"].startswith("data:"):
if value is None:
continue
try:
data = Data.objects.get(id=value)
except Data.DoesNotExist:
fields[name] = {}
continue
output = copy.deepcopy(data.output)
hydrate_input_references(output, data.process.output_schema)
if hydrate_values:
for path, dirs, files in os.walk(root, topdown=False):
path = path[len(root) + 1 :]
subs.extend(os.path.join(path, f) for f in files)
subs.extend(os.path.join(path, d) for d in dirs)
return subs
unreferenced_files = subfiles(root)
remove_file("jsonout.txt", unreferenced_files)
remove_file("stderr.txt", unreferenced_files)
remove_file("stdout.txt", unreferenced_files)
meta_fields = [[output, output_schema], [descriptor, descriptor_schema]]
for meta_field, meta_field_schema in meta_fields:
for field_schema, fields in iterate_fields(meta_field, meta_field_schema):
if "type" in field_schema:
field_type = field_schema["type"]
field_name = field_schema["name"]
# Remove basic:file: entries
if field_type.startswith("basic:file:"):
remove_file(fields[field_name]["file"], unreferenced_files)
# Remove list:basic:file: entries
elif field_type.startswith("list:basic:file:"):
for field in fields[field_name]:
remove_file(field["file"], unreferenced_files)
# Remove basic:dir: entries
elif field_type.startswith("basic:dir:"):
remove_tree(fields[field_name]["dir"], unreferenced_files)
def __new__(cls, value=""):
"""Initialize hydrated path."""
hydrated = str.__new__(cls, value)
hydrated.data_id = data.id
hydrated.file_name = file_name
return hydrated
return HydratedPath(manager.get_executor().resolve_data_path(data, file_name))
def hydrate_storage(storage_id):
"""Hydrate storage fields."""
from .storage import LazyStorageJSON # Prevent circular import.
return LazyStorageJSON(pk=storage_id)
for field_schema, fields in iterate_fields(output, output_schema):
name = field_schema["name"]
value = fields[name]
if "type" in field_schema:
if field_schema["type"].startswith("basic:file:"):
value["file"] = hydrate_path(value["file"])
value["refs"] = [hydrate_path(ref) for ref in value.get("refs", [])]
elif field_schema["type"].startswith("list:basic:file:"):
for obj in value:
obj["file"] = hydrate_path(obj["file"])
obj["refs"] = [hydrate_path(ref) for ref in obj.get("refs", [])]
if field_schema["type"].startswith("basic:dir:"):
value["dir"] = hydrate_path(value["dir"])
value["refs"] = [hydrate_path(ref) for ref in value.get("refs", [])]
def save_dependencies(self, instance, schema):
"""Save data: and list:data: references as parents."""
def add_dependency(value):
"""Add parent Data dependency."""
try:
DataDependency.objects.update_or_create(
parent=Data.objects.get(pk=value),
child=self,
defaults={"kind": DataDependency.KIND_IO},
)
except Data.DoesNotExist:
pass
for field_schema, fields in iterate_fields(instance, schema):
name = field_schema["name"]
value = fields[name]
if field_schema.get("type", "").startswith("data:"):
add_dependency(value)
elif field_schema.get("type", "").startswith("list:data:"):
for data in value:
add_dependency(data)
def referenced_schema_files(fields, schema):
"""Get the list of files and directories references by fields.
:return: tuple of lists, first list containing files and
directories refereced in data.output.
:rtype: Tuple[List[str], List[str]]
"""
refs = []
for field_schema, fields in iterate_fields(fields, schema):
if "type" in field_schema:
field_type = field_schema["type"]
field_name = field_schema["name"]
# Add basic:file: entries
if field_type.startswith("basic:file:"):
refs.append(fields[field_name]["file"])
refs += fields[field_name].get("refs", [])
# Add list:basic:file: entries
elif field_type.startswith("list:basic:file:"):
for field in fields[field_name]:
refs.append(field["file"])
refs += field.get("refs", [])
# Add basic:dir: entries
validate_data(field, type_)
elif type_.startswith("list:data:"):
for data_id in field:
validate_data(data_id, type_[5:]) # remove `list:` from type
elif type_ == "basic:integer:" or type_ == "basic:decimal:":
validate_range(field, _schema.get("range"), name)
elif type_ == "list:basic:integer:" or type_ == "list:basic:decimal:":
for obj in field:
validate_range(obj, _schema.get("range"), name)
try:
# Check that schema definitions exist for all fields
for _, _ in iterate_fields(instance, schema):
pass
except KeyError as ex:
raise ValidationError(str(ex))
if is_dirty:
dirty_fields = ['"{}"'.format(field) for field in dirty_fields]
raise DirtyError(
"Required fields {} not given.".format(", ".join(dirty_fields))
)
def remove_total_size(apps, schema_editor):
"""Remove ``total_size`` field from all file/dir-type outputs."""
Data = apps.get_model("flow", "Data")
for data in Data.objects.all():
for field_schema, fields in iterate_fields(
data.output, data.process.output_schema
):
name = field_schema["name"]
value = fields[name]
if "type" in field_schema:
if field_schema["type"].startswith("basic:file:"):
del value["total_size"]
elif field_schema["type"].startswith("list:basic:file:"):
for obj in value:
del obj["total_size"]
elif field_schema["type"].startswith("basic:dir:"):
del value["total_size"]
elif field_schema["type"].startswith("list:basic:dir:"):
for obj in value:
del obj["total_size"]
data.save()
def get_collection_of_input_entities(data):
"""Get collection that contains all "entity inputs" of a given data.
With "entity input", one refers to the inputs that are part of an entity.
"""
# Prevent circular imports:
from resolwe.flow.models import Collection
data_ids = set()
for field_schema, fields in iterate_fields(data.input, data.process.input_schema):
name = field_schema["name"]
value = fields[name]
if "type" not in field_schema:
continue
if field_schema["type"].startswith("data:"):
value = [value]
elif not field_schema["type"].startswith("list:data:"):
continue
data_ids.update([val for val in value if val is not None])
collections = Collection.objects.filter(
data__in=list(data_ids), data__entity__isnull=False,
).distinct()